Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(632)

Unified Diff: tools/telemetry/third_party/gsutilz/gslib/boto_translation.py

Issue 1376593003: Roll gsutil version to 4.15. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutilz/gslib/boto_translation.py
diff --git a/tools/telemetry/third_party/gsutilz/gslib/boto_translation.py b/tools/telemetry/third_party/gsutilz/gslib/boto_translation.py
index 92013904c47ffa66a82776e3d2f6da29ebcff54b..edb69ddcb2e1ddab89a9982a4102c4528b058344 100644
--- a/tools/telemetry/third_party/gsutilz/gslib/boto_translation.py
+++ b/tools/telemetry/third_party/gsutilz/gslib/boto_translation.py
@@ -30,6 +30,7 @@ import re
import socket
import tempfile
import textwrap
+import threading
import time
import xml
from xml.dom.minidom import parseString as XmlParseString
@@ -74,6 +75,7 @@ from gslib.translation_helper import AclTranslation
from gslib.translation_helper import AddS3MarkerAclToObjectMetadata
from gslib.translation_helper import CorsTranslation
from gslib.translation_helper import CreateBucketNotFoundException
+from gslib.translation_helper import CreateNotFoundExceptionForObjectWrite
from gslib.translation_helper import CreateObjectNotFoundException
from gslib.translation_helper import DEFAULT_CONTENT_TYPE
from gslib.translation_helper import EncodeStringAsLong
@@ -84,10 +86,8 @@ from gslib.translation_helper import REMOVE_CORS_CONFIG
from gslib.translation_helper import S3MarkerAclFromObjectMetadata
from gslib.util import ConfigureNoOpAuthIfNeeded
from gslib.util import DEFAULT_FILE_BUFFER_SIZE
-from gslib.util import GetFileSize
from gslib.util import GetMaxRetryDelay
from gslib.util import GetNumRetries
-from gslib.util import MultiprocessingIsAvailable
from gslib.util import S3_DELETE_MARKER_GUID
from gslib.util import TWO_MIB
from gslib.util import UnaryDictToXml
@@ -101,8 +101,11 @@ TRANSLATABLE_BOTO_EXCEPTIONS = (boto.exception.BotoServerError,
boto.exception.StorageCreateError,
boto.exception.StorageResponseError)
-# If multiprocessing is available, this will be overridden to a (thread-safe)
-# multiprocessing.Value in a call to InitializeMultiprocessingVariables.
+# pylint: disable=global-at-module-level
+global boto_auth_initialized, boto_auth_initialized_lock
+# If multiprocessing is available, these will be overridden to process-safe
+# variables in InitializeMultiprocessingVariables.
+boto_auth_initialized_lock = threading.Lock()
boto_auth_initialized = False
NON_EXISTENT_OBJECT_REGEX = re.compile(r'.*non-\s*existent\s*object',
@@ -111,16 +114,36 @@ NON_EXISTENT_OBJECT_REGEX = re.compile(r'.*non-\s*existent\s*object',
MD5_REGEX = re.compile(r'^"*[a-fA-F0-9]{32}"*$')
-def InitializeMultiprocessingVariables():
+def InitializeMultiprocessingVariables(): # pylint: disable=invalid-name
"""Perform necessary initialization for multiprocessing.
See gslib.command.InitializeMultiprocessingVariables for an explanation
of why this is necessary.
"""
- global boto_auth_initialized # pylint: disable=global-variable-undefined
+ # pylint: disable=global-variable-undefined
+ global boto_auth_initialized, boto_auth_initialized_lock
+ boto_auth_initialized_lock = gslib.util.CreateLock()
boto_auth_initialized = multiprocessing.Value('i', 0)
+class DownloadProxyCallbackHandler(object):
+ """Intermediary callback to keep track of the number of bytes downloaded."""
+
+ def __init__(self, start_byte, callback):
+ self._start_byte = start_byte
+ self._callback = callback
+
+ def call(self, bytes_downloaded, total_size):
+ """Saves necessary data and then calls the given Cloud API callback.
+
+ Args:
+ bytes_downloaded: Number of bytes processed so far.
+ total_size: Total size of the ongoing operation.
+ """
+ if self._callback:
+ self._callback(self._start_byte + bytes_downloaded, total_size)
+
+
class BotoTranslation(CloudApi):
"""Boto-based XML translation implementation of gsutil Cloud API.
@@ -130,7 +153,7 @@ class BotoTranslation(CloudApi):
"""
def __init__(self, bucket_storage_uri_class, logger, provider=None,
- credentials=None, debug=0):
+ credentials=None, debug=0, trace_token=None):
"""Performs necessary setup for interacting with the cloud storage provider.
Args:
@@ -142,17 +165,19 @@ class BotoTranslation(CloudApi):
the provider argument and use this one instead.
credentials: Unused.
debug: Debug level for the API implementation (0..3).
+ trace_token: Unused in this subclass.
"""
super(BotoTranslation, self).__init__(bucket_storage_uri_class, logger,
provider=provider, debug=debug)
_ = credentials
- global boto_auth_initialized # pylint: disable=global-variable-undefined
- if MultiprocessingIsAvailable()[0] and not boto_auth_initialized.value:
- ConfigureNoOpAuthIfNeeded()
- boto_auth_initialized.value = 1
- elif not boto_auth_initialized:
+ # pylint: disable=global-variable-undefined, global-variable-not-assigned
+ global boto_auth_initialized, boto_auth_initialized_lock
+ with boto_auth_initialized_lock:
ConfigureNoOpAuthIfNeeded()
- boto_auth_initialized = True
+ if isinstance(boto_auth_initialized, bool):
+ boto_auth_initialized = True
+ else:
+ boto_auth_initialized.value = 1
self.api_version = boto.config.get_value(
'GSUtil', 'default_api_version', '1')
@@ -318,8 +343,10 @@ class BotoTranslation(CloudApi):
_ = provider
get_fields = self._ListToGetFields(list_fields=fields)
bucket_uri = self._StorageUriForBucket(bucket_name)
- prefix_list = []
headers = {}
+ yield_prefixes = fields is None or 'prefixes' in fields
+ yield_objects = fields is None or any(
+ field.startswith('items/') for field in fields)
self._AddApiVersionToHeaders(headers)
try:
objects_iter = bucket_uri.list_bucket(prefix=prefix or '',
@@ -331,11 +358,10 @@ class BotoTranslation(CloudApi):
try:
for key in objects_iter:
- if isinstance(key, Prefix):
- prefix_list.append(key.name)
+ if yield_prefixes and isinstance(key, Prefix):
yield CloudApi.CsObjectOrPrefix(key.name,
CloudApi.CsObjectOrPrefixType.PREFIX)
- else:
+ elif yield_objects:
key_to_convert = key
# Listed keys are populated with these fields during bucket listing.
@@ -407,11 +433,11 @@ class BotoTranslation(CloudApi):
self._AddApiVersionToHeaders(headers)
if 'accept-encoding' not in headers:
headers['accept-encoding'] = 'gzip'
- if end_byte:
+ if end_byte is not None:
headers['range'] = 'bytes=%s-%s' % (start_byte, end_byte)
elif start_byte > 0:
headers['range'] = 'bytes=%s-' % (start_byte)
- else:
+ elif start_byte < 0:
headers['range'] = 'bytes=%s' % (start_byte)
# Since in most cases we already made a call to get the object metadata,
@@ -443,7 +469,8 @@ class BotoTranslation(CloudApi):
try:
if download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
self._PerformResumableDownload(
- download_stream, key, headers=headers, callback=progress_callback,
+ download_stream, start_byte, end_byte, key,
+ headers=headers, callback=progress_callback,
num_callbacks=num_progress_callbacks, hash_algs=hash_algs)
elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
self._PerformSimpleDownload(
@@ -505,13 +532,16 @@ class BotoTranslation(CloudApi):
key.get_contents_to_file(download_stream, cb=progress_callback,
num_cb=num_progress_callbacks, headers=headers)
- def _PerformResumableDownload(self, fp, key, headers=None, callback=None,
+ def _PerformResumableDownload(self, fp, start_byte, end_byte, key,
+ headers=None, callback=None,
num_callbacks=XML_PROGRESS_CALLBACKS,
hash_algs=None):
"""Downloads bytes from key to fp, resuming as needed.
Args:
- fp: File pointer into which data should be downloaded
+ fp: File pointer into which data should be downloaded.
+ start_byte: Start byte of the download.
+ end_byte: End byte of the download.
key: Key object from which data is to be downloaded
headers: Headers to send when retrieving the file
callback: (optional) a callback function that will be called to report
@@ -540,37 +570,22 @@ class BotoTranslation(CloudApi):
num_retries = GetNumRetries()
progress_less_iterations = 0
+ last_progress_byte = start_byte
while True: # Retry as long as we're making progress.
- had_file_bytes_before_attempt = GetFileSize(fp)
try:
- cur_file_size = GetFileSize(fp, position_to_eof=True)
-
- def DownloadProxyCallback(total_bytes_downloaded, total_size):
- """Translates a boto callback into a gsutil Cloud API callback.
-
- Callbacks are originally made by boto.s3.Key.get_file(); here we take
- into account that we're resuming a download.
-
- Args:
- total_bytes_downloaded: Actual bytes downloaded so far, not
- including the point we resumed from.
- total_size: Total size of the download.
- """
- if callback:
- callback(cur_file_size + total_bytes_downloaded, total_size)
-
+ cb_handler = DownloadProxyCallbackHandler(start_byte, callback)
headers = headers.copy()
- headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1)
- cb = DownloadProxyCallback
+ headers['Range'] = 'bytes=%d-%d' % (start_byte, end_byte)
# Disable AWSAuthConnection-level retry behavior, since that would
# cause downloads to restart from scratch.
try:
- key.get_file(fp, headers, cb, num_callbacks, override_num_retries=0,
- hash_algs=hash_algs)
+ key.get_file(fp, headers, cb_handler.call, num_callbacks,
+ override_num_retries=0, hash_algs=hash_algs)
except TypeError:
- key.get_file(fp, headers, cb, num_callbacks, override_num_retries=0)
+ key.get_file(fp, headers, cb_handler.call, num_callbacks,
+ override_num_retries=0)
fp.flush()
# Download succeeded.
return
@@ -583,9 +598,10 @@ class BotoTranslation(CloudApi):
# so we need to close and reopen the key before resuming
# the download.
if self.provider == 's3':
- key.get_file(fp, headers, cb, num_callbacks, override_num_retries=0)
+ key.get_file(fp, headers, cb_handler.call, num_callbacks,
+ override_num_retries=0)
else: # self.provider == 'gs'
- key.get_file(fp, headers, cb, num_callbacks,
+ key.get_file(fp, headers, cb_handler.call, num_callbacks,
override_num_retries=0, hash_algs=hash_algs)
except BotoResumableDownloadException, e:
if (e.disposition ==
@@ -597,7 +613,9 @@ class BotoTranslation(CloudApi):
'retry', e.message)
# At this point we had a re-tryable failure; see if made progress.
- if GetFileSize(fp) > had_file_bytes_before_attempt:
+ start_byte = fp.tell()
+ if start_byte > last_progress_byte:
+ last_progress_byte = start_byte
progress_less_iterations = 0
else:
progress_less_iterations += 1
@@ -819,8 +837,11 @@ class BotoTranslation(CloudApi):
return self._HandleSuccessfulUpload(dst_uri, object_metadata,
fields=fields)
except TRANSLATABLE_BOTO_EXCEPTIONS, e:
+ not_found_exception = CreateNotFoundExceptionForObjectWrite(
+ self.provider, object_metadata.bucket)
self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket,
- object_name=object_metadata.name)
+ object_name=object_metadata.name,
+ not_found_exception=not_found_exception)
def UploadObjectStreaming(self, upload_stream, object_metadata,
canned_acl=None, progress_callback=None,
@@ -836,8 +857,11 @@ class BotoTranslation(CloudApi):
return self._HandleSuccessfulUpload(dst_uri, object_metadata,
fields=fields)
except TRANSLATABLE_BOTO_EXCEPTIONS, e:
+ not_found_exception = CreateNotFoundExceptionForObjectWrite(
+ self.provider, object_metadata.bucket)
self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket,
- object_name=object_metadata.name)
+ object_name=object_metadata.name,
+ not_found_exception=not_found_exception)
def UploadObject(self, upload_stream, object_metadata, canned_acl=None,
preconditions=None, size=None, progress_callback=None,
@@ -860,8 +884,11 @@ class BotoTranslation(CloudApi):
return self._HandleSuccessfulUpload(dst_uri, object_metadata,
fields=fields)
except TRANSLATABLE_BOTO_EXCEPTIONS, e:
+ not_found_exception = CreateNotFoundExceptionForObjectWrite(
+ self.provider, object_metadata.bucket)
self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket,
- object_name=object_metadata.name)
+ object_name=object_metadata.name,
+ not_found_exception=not_found_exception)
def DeleteObject(self, bucket_name, object_name, preconditions=None,
generation=None, provider=None):
@@ -920,8 +947,13 @@ class BotoTranslation(CloudApi):
return self._BotoKeyToObject(new_key, fields=fields)
except TRANSLATABLE_BOTO_EXCEPTIONS, e:
- self._TranslateExceptionAndRaise(e, dst_obj_metadata.bucket,
- dst_obj_metadata.name)
+ not_found_exception = CreateNotFoundExceptionForObjectWrite(
+ self.provider, dst_obj_metadata.bucket, src_provider=self.provider,
+ src_bucket_name=src_obj_metadata.bucket,
+ src_object_name=src_obj_metadata.name, src_generation=src_generation)
+ self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket,
+ object_name=dst_obj_metadata.name,
+ not_found_exception=not_found_exception)
def ComposeObject(self, src_objs_metadata, dst_obj_metadata,
preconditions=None, provider=None, fields=None):
@@ -1382,7 +1414,7 @@ class BotoTranslation(CloudApi):
raise
def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None,
- generation=None):
+ generation=None, not_found_exception=None):
"""Translates a Boto exception and raises the translated or original value.
Args:
@@ -1390,6 +1422,7 @@ class BotoTranslation(CloudApi):
bucket_name: Optional bucket name in request that caused the exception.
object_name: Optional object name in request that caused the exception.
generation: Optional generation in request that caused the exception.
+ not_found_exception: Optional exception to raise in the not-found case.
Raises:
Translated CloudApi exception, or the original exception if it was not
@@ -1397,14 +1430,14 @@ class BotoTranslation(CloudApi):
"""
translated_exception = self._TranslateBotoException(
e, bucket_name=bucket_name, object_name=object_name,
- generation=generation)
+ generation=generation, not_found_exception=not_found_exception)
if translated_exception:
raise translated_exception
else:
raise
def _TranslateBotoException(self, e, bucket_name=None, object_name=None,
- generation=None):
+ generation=None, not_found_exception=None):
"""Translates boto exceptions into their gsutil Cloud API equivalents.
Args:
@@ -1412,6 +1445,7 @@ class BotoTranslation(CloudApi):
bucket_name: Optional bucket name in request that caused the exception.
object_name: Optional object name in request that caused the exception.
generation: Optional generation in request that caused the exception.
+ not_found_exception: Optional exception to raise in the not-found case.
Returns:
CloudStorageApiServiceException for translatable exceptions, None
@@ -1425,14 +1459,20 @@ class BotoTranslation(CloudApi):
elif e.status == 401 or e.status == 403:
return AccessDeniedException(e.code, status=e.status, body=e.body)
elif e.status == 404:
- if bucket_name:
+ if not_found_exception:
+ # The exception is pre-constructed prior to translation; the HTTP
+ # status code isn't available at that time.
+ setattr(not_found_exception, 'status', e.status)
+ return not_found_exception
+ elif bucket_name:
if object_name:
return CreateObjectNotFoundException(e.status, self.provider,
bucket_name, object_name,
generation=generation)
return CreateBucketNotFoundException(e.status, self.provider,
bucket_name)
- return NotFoundException(e.code, status=e.status, body=e.body)
+ return NotFoundException(e.message, status=e.status, body=e.body)
+
elif e.status == 409 and e.code and 'BucketNotEmpty' in e.code:
return NotEmptyException('BucketNotEmpty (%s)' % bucket_name,
status=e.status, body=e.body)

Powered by Google App Engine
This is Rietveld 408576698