Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(71)

Unified Diff: tools/telemetry/third_party/gsutilz/gslib/gcs_json_api.py

Issue 1376593003: Roll gsutil version to 4.15. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutilz/gslib/gcs_json_api.py
diff --git a/tools/telemetry/third_party/gsutilz/gslib/gcs_json_api.py b/tools/telemetry/third_party/gsutilz/gslib/gcs_json_api.py
index e57671d35da9d677e9aeb92f30c676193a509ca5..d16fae974c4af2c6f96c8fe3f56862244caf436a 100644
--- a/tools/telemetry/third_party/gsutilz/gslib/gcs_json_api.py
+++ b/tools/telemetry/third_party/gsutilz/gslib/gcs_json_api.py
@@ -36,6 +36,7 @@ import boto
from boto import config
from gcs_oauth2_boto_plugin import oauth2_helper
import httplib2
+import oauth2client
from oauth2client import devshell
from oauth2client import multistore_file
@@ -73,8 +74,10 @@ from gslib.tracker_file import HashRewriteParameters
from gslib.tracker_file import ReadRewriteTrackerFile
from gslib.tracker_file import WriteRewriteTrackerFile
from gslib.translation_helper import CreateBucketNotFoundException
+from gslib.translation_helper import CreateNotFoundExceptionForObjectWrite
from gslib.translation_helper import CreateObjectNotFoundException
from gslib.translation_helper import DEFAULT_CONTENT_TYPE
+from gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL
from gslib.translation_helper import REMOVE_CORS_CONFIG
from gslib.util import GetBotoConfigFileList
from gslib.util import GetCertsFile
@@ -132,7 +135,7 @@ class GcsJsonApi(CloudApi):
"""Google Cloud Storage JSON implementation of gsutil Cloud API."""
def __init__(self, bucket_storage_uri_class, logger, provider=None,
- credentials=None, debug=0):
+ credentials=None, debug=0, trace_token=None):
"""Performs necessary setup for interacting with Google Cloud Storage.
Args:
@@ -142,6 +145,7 @@ class GcsJsonApi(CloudApi):
credentials: Credentials to be used for interacting with Google Cloud
Storage.
debug: Debug level for the API implementation (0..3).
+ trace_token: Trace token to pass to the API implementation.
"""
# TODO: Plumb host_header for perfdiag / test_perfdiag.
# TODO: Add jitter to apitools' http_wrapper retry mechanism.
@@ -163,6 +167,22 @@ class GcsJsonApi(CloudApi):
self.certs_file = GetCertsFile()
self.http = GetNewHttp()
+
+ # Re-use download and upload connections. This class is only called
+ # sequentially, but we can share TCP warmed-up connections across calls.
+ self.download_http = self._GetNewDownloadHttp()
+ self.upload_http = self._GetNewUploadHttp()
+ if self.credentials:
+ self.authorized_download_http = self.credentials.authorize(
+ self.download_http)
+ self.authorized_upload_http = self.credentials.authorize(self.upload_http)
+ else:
+ self.authorized_download_http = self.download_http
+ self.authorized_upload_http = self.upload_http
+
+ WrapDownloadHttpRequest(self.authorized_download_http)
+ WrapUploadHttpRequest(self.authorized_upload_http)
+
self.http_base = 'https://'
gs_json_host = config.get('Credentials', 'gs_json_host', None)
self.host_base = gs_json_host or 'www.googleapis.com'
@@ -195,19 +215,27 @@ class GcsJsonApi(CloudApi):
self.url_base = (self.http_base + self.host_base + self.host_port + '/' +
'storage/' + self.api_version + '/')
+ credential_store_key_dict = self._GetCredentialStoreKeyDict(
+ self.credentials)
+
self.credentials.set_store(
- multistore_file.get_credential_storage_custom_string_key(
- GetCredentialStoreFilename(), self.api_version))
+ multistore_file.get_credential_storage_custom_key(
+ GetCredentialStoreFilename(), credential_store_key_dict))
self.num_retries = GetNumRetries()
+ self.max_retry_wait = GetMaxRetryDelay()
log_request = (debug >= 3)
log_response = (debug >= 3)
+ self.global_params = apitools_messages.StandardQueryParameters(
+ trace='token:%s' % trace_token) if trace_token else None
+
self.api_client = apitools_client.StorageV1(
url=self.url_base, http=self.http, log_request=log_request,
log_response=log_response, credentials=self.credentials,
- version=self.api_version)
+ version=self.api_version, default_global_params=self.global_params)
+ self.api_client.max_retry_wait = self.max_retry_wait
self.api_client.num_retries = self.num_retries
if no_op_credentials:
@@ -309,8 +337,40 @@ class GcsJsonApi(CloudApi):
except:
raise
- def _GetNewDownloadHttp(self, download_stream):
- return GetNewHttp(http_class=HttpWithDownloadStream, stream=download_stream)
+ def _GetCredentialStoreKeyDict(self, credentials):
+ """Disambiguates a credential for caching in a credential store.
+
+ Different credential types have different fields that identify them.
+ This function assembles relevant information in a dict and returns it.
+
+ Args:
+ credentials: An OAuth2Credentials object.
+
+ Returns:
+ Dict of relevant identifiers for credentials.
+ """
+ # TODO: If scopes ever become available in the credentials themselves,
+ # include them in the key dict.
+ key_dict = {'api_version': self.api_version}
+ # pylint: disable=protected-access
+ if isinstance(credentials, devshell.DevshellCredentials):
+ key_dict['user_email'] = credentials.user_email
+ elif isinstance(credentials,
+ oauth2client.service_account._ServiceAccountCredentials):
+ key_dict['_service_account_email'] = credentials._service_account_email
+ elif isinstance(credentials,
+ oauth2client.client.SignedJwtAssertionCredentials):
+ key_dict['service_account_name'] = credentials.service_account_name
+ elif isinstance(credentials, oauth2client.client.OAuth2Credentials):
+ if credentials.client_id and credentials.client_id != 'null':
+ key_dict['client_id'] = credentials.client_id
+ key_dict['refresh_token'] = credentials.refresh_token
+ # pylint: enable=protected-access
+
+ return key_dict
+
+ def _GetNewDownloadHttp(self):
+ return GetNewHttp(http_class=HttpWithDownloadStream)
def _GetNewUploadHttp(self):
"""Returns an upload-safe Http object (by disabling httplib2 retries)."""
@@ -360,6 +420,11 @@ class GcsJsonApi(CloudApi):
bucket_metadata.cors = []
apitools_include_fields.append('cors')
+ if (bucket_metadata.defaultObjectAcl and
+ bucket_metadata.defaultObjectAcl[0] == PRIVATE_DEFAULT_OBJ_ACL):
+ bucket_metadata.defaultObjectAcl = []
+ apitools_include_fields.append('defaultObjectAcl')
+
predefined_acl = None
if canned_acl:
# Must null out existing ACLs to apply a canned ACL.
@@ -528,14 +593,16 @@ class GcsJsonApi(CloudApi):
yield object_or_prefix
def _YieldObjectsAndPrefixes(self, object_list):
- if object_list.items:
- for cloud_obj in object_list.items:
- yield CloudApi.CsObjectOrPrefix(cloud_obj,
- CloudApi.CsObjectOrPrefixType.OBJECT)
+ # Yield prefixes first so that checking for the presence of a subdirectory
+ # is fast.
if object_list.prefixes:
for prefix in object_list.prefixes:
yield CloudApi.CsObjectOrPrefix(prefix,
CloudApi.CsObjectOrPrefixType.PREFIX)
+ if object_list.items:
+ for cloud_obj in object_list.items:
+ yield CloudApi.CsObjectOrPrefix(cloud_obj,
+ CloudApi.CsObjectOrPrefixType.OBJECT)
def GetObjectMetadata(self, bucket_name, object_name, generation=None,
provider=None, fields=None):
@@ -573,8 +640,14 @@ class GcsJsonApi(CloudApi):
if generation:
generation = long(generation)
+ # 'outer_total_size' is only used for formatting user output, and is
+ # expected to be one higher than the last byte that should be downloaded.
+ # TODO: Change DownloadCallbackConnectionClassFactory and progress callbacks
+ # to more elegantly handle total size for components of files.
outer_total_size = object_size
- if serialization_data:
+ if end_byte:
+ outer_total_size = end_byte + 1
+ elif serialization_data:
outer_total_size = json.loads(serialization_data)['total_size']
if progress_callback:
@@ -582,7 +655,7 @@ class GcsJsonApi(CloudApi):
raise ArgumentException('Download size is required when callbacks are '
'requested for a download, but no size was '
'provided.')
- progress_callback(0, outer_total_size)
+ progress_callback(start_byte, outer_total_size)
bytes_downloaded_container = BytesTransferredContainer()
bytes_downloaded_container.bytes_transferred = start_byte
@@ -592,10 +665,9 @@ class GcsJsonApi(CloudApi):
progress_callback=progress_callback, digesters=digesters)
download_http_class = callback_class_factory.GetConnectionClass()
- download_http = self._GetNewDownloadHttp(download_stream)
- download_http.connections = {'https': download_http_class}
- authorized_download_http = self.credentials.authorize(download_http)
- WrapDownloadHttpRequest(authorized_download_http)
+ # Point our download HTTP at our download stream.
+ self.download_http.stream = download_stream
+ self.download_http.connections = {'https': download_http_class}
if serialization_data:
apitools_download = apitools_transfer.Download.FromData(
@@ -606,7 +678,7 @@ class GcsJsonApi(CloudApi):
download_stream, auto_transfer=False, total_size=object_size,
num_retries=self.num_retries)
- apitools_download.bytes_http = authorized_download_http
+ apitools_download.bytes_http = self.authorized_download_http
apitools_request = apitools_messages.StorageObjectsGetRequest(
bucket=bucket_name, object=object_name, generation=generation)
@@ -654,7 +726,7 @@ class GcsJsonApi(CloudApi):
raise ResumableDownloadException(
'Transfer failed after %d retries. Final exception: %s' %
(self.num_retries, unicode(e).encode(UTF8)))
- time.sleep(CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay()))
+ time.sleep(CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(
'Retrying download from byte %s after exception: %s. Trace: %s',
@@ -694,7 +766,7 @@ class GcsJsonApi(CloudApi):
'accept-encoding': 'gzip',
'user-agent': self.api_client.user_agent
}
- if start_byte or end_byte:
+ if start_byte or end_byte is not None:
apitools_download.GetRange(additional_headers=additional_headers,
start=start_byte, end=end_byte,
use_chunks=False)
@@ -781,13 +853,10 @@ class GcsJsonApi(CloudApi):
bytes_uploaded_container, total_size=total_size,
progress_callback=progress_callback)
- upload_http = self._GetNewUploadHttp()
upload_http_class = callback_class_factory.GetConnectionClass()
- upload_http.connections = {'http': upload_http_class,
- 'https': upload_http_class}
+ self.upload_http.connections = {'http': upload_http_class,
+ 'https': upload_http_class}
- authorized_upload_http = self.credentials.authorize(upload_http)
- WrapUploadHttpRequest(authorized_upload_http)
# Since bytes_http is created in this function, we don't get the
# user-agent header from api_client's http automatically.
additional_headers = {
@@ -822,7 +891,7 @@ class GcsJsonApi(CloudApi):
upload_stream, content_type, total_size=size, auto_transfer=True,
num_retries=self.num_retries)
apitools_upload.strategy = apitools_strategy
- apitools_upload.bytes_http = authorized_upload_http
+ apitools_upload.bytes_http = self.authorized_upload_http
return self.api_client.objects.Insert(
apitools_request,
@@ -830,13 +899,16 @@ class GcsJsonApi(CloudApi):
global_params=global_params)
else: # Resumable upload.
return self._PerformResumableUpload(
- upload_stream, authorized_upload_http, content_type, size,
+ upload_stream, self.authorized_upload_http, content_type, size,
serialization_data, apitools_strategy, apitools_request,
global_params, bytes_uploaded_container, tracker_callback,
additional_headers, progress_callback)
except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
+ not_found_exception = CreateNotFoundExceptionForObjectWrite(
+ self.provider, object_metadata.bucket)
self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket,
- object_name=object_metadata.name)
+ object_name=object_metadata.name,
+ not_found_exception=not_found_exception)
def _PerformResumableUpload(
self, upload_stream, authorized_upload_http, content_type, size,
@@ -929,7 +1001,7 @@ class GcsJsonApi(CloudApi):
'Transfer failed after %d retries. Final exception: %s' %
(self.num_retries, e2))
time.sleep(
- CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay()))
+ CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
if start_byte > last_progress_byte:
# We've made progress, so allow a fresh set of retries.
last_progress_byte = start_byte
@@ -941,7 +1013,7 @@ class GcsJsonApi(CloudApi):
'Transfer failed after %d retries. Final exception: %s' %
(self.num_retries, unicode(e).encode(UTF8)))
time.sleep(
- CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay()))
+ CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(
'Retrying upload from byte %s after exception: %s. Trace: %s',
@@ -1069,8 +1141,13 @@ class GcsJsonApi(CloudApi):
DeleteTrackerFile(tracker_file_name)
return rewrite_response.resource
except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
+ not_found_exception = CreateNotFoundExceptionForObjectWrite(
+ self.provider, dst_obj_metadata.bucket, src_provider=self.provider,
+ src_bucket_name=src_obj_metadata.bucket,
+ src_object_name=src_obj_metadata.name, src_generation=src_generation)
self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket,
- object_name=dst_obj_metadata.name)
+ object_name=dst_obj_metadata.name,
+ not_found_exception=not_found_exception)
def DeleteObject(self, bucket_name, object_name, preconditions=None,
generation=None, provider=None):
@@ -1210,7 +1287,7 @@ class GcsJsonApi(CloudApi):
raise ArgumentException('Invalid canned ACL %s' % canned_acl_string)
def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None,
- generation=None):
+ generation=None, not_found_exception=None):
"""Translates an HTTP exception and raises the translated or original value.
Args:
@@ -1218,6 +1295,7 @@ class GcsJsonApi(CloudApi):
bucket_name: Optional bucket name in request that caused the exception.
object_name: Optional object name in request that caused the exception.
generation: Optional generation in request that caused the exception.
+ not_found_exception: Optional exception to raise in the not-found case.
Raises:
Translated CloudApi exception, or the original exception if it was not
@@ -1225,7 +1303,7 @@ class GcsJsonApi(CloudApi):
"""
translated_exception = self._TranslateApitoolsException(
e, bucket_name=bucket_name, object_name=object_name,
- generation=generation)
+ generation=generation, not_found_exception=not_found_exception)
if translated_exception:
raise translated_exception
else:
@@ -1242,8 +1320,7 @@ class GcsJsonApi(CloudApi):
# If we couldn't decode anything, just leave the message as None.
pass
- def _TranslateApitoolsResumableUploadException(
- self, e, bucket_name=None, object_name=None, generation=None):
+ def _TranslateApitoolsResumableUploadException(self, e):
if isinstance(e, apitools_exceptions.HttpError):
message = self._GetMessageFromHttpError(e)
if (e.status_code == 503 and
@@ -1274,7 +1351,7 @@ class GcsJsonApi(CloudApi):
return ResumableUploadAbortException(e.message)
def _TranslateApitoolsException(self, e, bucket_name=None, object_name=None,
- generation=None):
+ generation=None, not_found_exception=None):
"""Translates apitools exceptions into their gsutil Cloud Api equivalents.
Args:
@@ -1282,6 +1359,7 @@ class GcsJsonApi(CloudApi):
bucket_name: Optional bucket name in request that caused the exception.
object_name: Optional object name in request that caused the exception.
generation: Optional generation in request that caused the exception.
+ not_found_exception: Optional exception to raise in the not-found case.
Returns:
CloudStorageApiServiceException for translatable exceptions, None
@@ -1333,7 +1411,12 @@ class GcsJsonApi(CloudApi):
return AccessDeniedException(message or e.message,
status=e.status_code)
elif e.status_code == 404:
- if bucket_name:
+ if not_found_exception:
+ # The exception is pre-constructed prior to translation; the HTTP
+ # status code isn't available at that time.
+ setattr(not_found_exception, 'status', e.status_code)
+ return not_found_exception
+ elif bucket_name:
if object_name:
return CreateObjectNotFoundException(e.status_code, self.provider,
bucket_name, object_name,
@@ -1341,6 +1424,7 @@ class GcsJsonApi(CloudApi):
return CreateBucketNotFoundException(e.status_code, self.provider,
bucket_name)
return NotFoundException(e.message, status=e.status_code)
+
elif e.status_code == 409 and bucket_name:
if 'The bucket you tried to delete was not empty.' in str(e):
return NotEmptyException('BucketNotEmpty (%s)' % bucket_name,
« no previous file with comments | « tools/telemetry/third_party/gsutilz/gslib/copy_helper.py ('k') | tools/telemetry/third_party/gsutilz/gslib/gcs_json_media.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698