Index: tools/telemetry/third_party/gsutilz/gslib/gcs_json_api.py |
diff --git a/tools/telemetry/third_party/gsutilz/gslib/gcs_json_api.py b/tools/telemetry/third_party/gsutilz/gslib/gcs_json_api.py |
index e57671d35da9d677e9aeb92f30c676193a509ca5..d16fae974c4af2c6f96c8fe3f56862244caf436a 100644 |
--- a/tools/telemetry/third_party/gsutilz/gslib/gcs_json_api.py |
+++ b/tools/telemetry/third_party/gsutilz/gslib/gcs_json_api.py |
@@ -36,6 +36,7 @@ import boto |
from boto import config |
from gcs_oauth2_boto_plugin import oauth2_helper |
import httplib2 |
+import oauth2client |
from oauth2client import devshell |
from oauth2client import multistore_file |
@@ -73,8 +74,10 @@ from gslib.tracker_file import HashRewriteParameters |
from gslib.tracker_file import ReadRewriteTrackerFile |
from gslib.tracker_file import WriteRewriteTrackerFile |
from gslib.translation_helper import CreateBucketNotFoundException |
+from gslib.translation_helper import CreateNotFoundExceptionForObjectWrite |
from gslib.translation_helper import CreateObjectNotFoundException |
from gslib.translation_helper import DEFAULT_CONTENT_TYPE |
+from gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL |
from gslib.translation_helper import REMOVE_CORS_CONFIG |
from gslib.util import GetBotoConfigFileList |
from gslib.util import GetCertsFile |
@@ -132,7 +135,7 @@ class GcsJsonApi(CloudApi): |
"""Google Cloud Storage JSON implementation of gsutil Cloud API.""" |
def __init__(self, bucket_storage_uri_class, logger, provider=None, |
- credentials=None, debug=0): |
+ credentials=None, debug=0, trace_token=None): |
"""Performs necessary setup for interacting with Google Cloud Storage. |
Args: |
@@ -142,6 +145,7 @@ class GcsJsonApi(CloudApi): |
credentials: Credentials to be used for interacting with Google Cloud |
Storage. |
debug: Debug level for the API implementation (0..3). |
+ trace_token: Trace token to pass to the API implementation. |
""" |
# TODO: Plumb host_header for perfdiag / test_perfdiag. |
# TODO: Add jitter to apitools' http_wrapper retry mechanism. |
@@ -163,6 +167,22 @@ class GcsJsonApi(CloudApi): |
self.certs_file = GetCertsFile() |
self.http = GetNewHttp() |
+ |
+ # Re-use download and upload connections. This class is only called |
+ # sequentially, but we can share TCP warmed-up connections across calls. |
+ self.download_http = self._GetNewDownloadHttp() |
+ self.upload_http = self._GetNewUploadHttp() |
+ if self.credentials: |
+ self.authorized_download_http = self.credentials.authorize( |
+ self.download_http) |
+ self.authorized_upload_http = self.credentials.authorize(self.upload_http) |
+ else: |
+ self.authorized_download_http = self.download_http |
+ self.authorized_upload_http = self.upload_http |
+ |
+ WrapDownloadHttpRequest(self.authorized_download_http) |
+ WrapUploadHttpRequest(self.authorized_upload_http) |
+ |
self.http_base = 'https://' |
gs_json_host = config.get('Credentials', 'gs_json_host', None) |
self.host_base = gs_json_host or 'www.googleapis.com' |
@@ -195,19 +215,27 @@ class GcsJsonApi(CloudApi): |
self.url_base = (self.http_base + self.host_base + self.host_port + '/' + |
'storage/' + self.api_version + '/') |
+ credential_store_key_dict = self._GetCredentialStoreKeyDict( |
+ self.credentials) |
+ |
self.credentials.set_store( |
- multistore_file.get_credential_storage_custom_string_key( |
- GetCredentialStoreFilename(), self.api_version)) |
+ multistore_file.get_credential_storage_custom_key( |
+ GetCredentialStoreFilename(), credential_store_key_dict)) |
self.num_retries = GetNumRetries() |
+ self.max_retry_wait = GetMaxRetryDelay() |
log_request = (debug >= 3) |
log_response = (debug >= 3) |
+ self.global_params = apitools_messages.StandardQueryParameters( |
+ trace='token:%s' % trace_token) if trace_token else None |
+ |
self.api_client = apitools_client.StorageV1( |
url=self.url_base, http=self.http, log_request=log_request, |
log_response=log_response, credentials=self.credentials, |
- version=self.api_version) |
+ version=self.api_version, default_global_params=self.global_params) |
+ self.api_client.max_retry_wait = self.max_retry_wait |
self.api_client.num_retries = self.num_retries |
if no_op_credentials: |
@@ -309,8 +337,40 @@ class GcsJsonApi(CloudApi): |
except: |
raise |
- def _GetNewDownloadHttp(self, download_stream): |
- return GetNewHttp(http_class=HttpWithDownloadStream, stream=download_stream) |
+ def _GetCredentialStoreKeyDict(self, credentials): |
+ """Disambiguates a credential for caching in a credential store. |
+ |
+ Different credential types have different fields that identify them. |
+ This function assembles relevant information in a dict and returns it. |
+ |
+ Args: |
+ credentials: An OAuth2Credentials object. |
+ |
+ Returns: |
+ Dict of relevant identifiers for credentials. |
+ """ |
+ # TODO: If scopes ever become available in the credentials themselves, |
+ # include them in the key dict. |
+ key_dict = {'api_version': self.api_version} |
+ # pylint: disable=protected-access |
+ if isinstance(credentials, devshell.DevshellCredentials): |
+ key_dict['user_email'] = credentials.user_email |
+ elif isinstance(credentials, |
+ oauth2client.service_account._ServiceAccountCredentials): |
+ key_dict['_service_account_email'] = credentials._service_account_email |
+ elif isinstance(credentials, |
+ oauth2client.client.SignedJwtAssertionCredentials): |
+ key_dict['service_account_name'] = credentials.service_account_name |
+ elif isinstance(credentials, oauth2client.client.OAuth2Credentials): |
+ if credentials.client_id and credentials.client_id != 'null': |
+ key_dict['client_id'] = credentials.client_id |
+ key_dict['refresh_token'] = credentials.refresh_token |
+ # pylint: enable=protected-access |
+ |
+ return key_dict |
+ |
+ def _GetNewDownloadHttp(self): |
+ return GetNewHttp(http_class=HttpWithDownloadStream) |
def _GetNewUploadHttp(self): |
"""Returns an upload-safe Http object (by disabling httplib2 retries).""" |
@@ -360,6 +420,11 @@ class GcsJsonApi(CloudApi): |
bucket_metadata.cors = [] |
apitools_include_fields.append('cors') |
+ if (bucket_metadata.defaultObjectAcl and |
+ bucket_metadata.defaultObjectAcl[0] == PRIVATE_DEFAULT_OBJ_ACL): |
+ bucket_metadata.defaultObjectAcl = [] |
+ apitools_include_fields.append('defaultObjectAcl') |
+ |
predefined_acl = None |
if canned_acl: |
# Must null out existing ACLs to apply a canned ACL. |
@@ -528,14 +593,16 @@ class GcsJsonApi(CloudApi): |
yield object_or_prefix |
def _YieldObjectsAndPrefixes(self, object_list): |
- if object_list.items: |
- for cloud_obj in object_list.items: |
- yield CloudApi.CsObjectOrPrefix(cloud_obj, |
- CloudApi.CsObjectOrPrefixType.OBJECT) |
+ # Yield prefixes first so that checking for the presence of a subdirectory |
+ # is fast. |
if object_list.prefixes: |
for prefix in object_list.prefixes: |
yield CloudApi.CsObjectOrPrefix(prefix, |
CloudApi.CsObjectOrPrefixType.PREFIX) |
+ if object_list.items: |
+ for cloud_obj in object_list.items: |
+ yield CloudApi.CsObjectOrPrefix(cloud_obj, |
+ CloudApi.CsObjectOrPrefixType.OBJECT) |
def GetObjectMetadata(self, bucket_name, object_name, generation=None, |
provider=None, fields=None): |
@@ -573,8 +640,14 @@ class GcsJsonApi(CloudApi): |
if generation: |
generation = long(generation) |
+ # 'outer_total_size' is only used for formatting user output, and is |
+ # expected to be one higher than the last byte that should be downloaded. |
+ # TODO: Change DownloadCallbackConnectionClassFactory and progress callbacks |
+ # to more elegantly handle total size for components of files. |
outer_total_size = object_size |
- if serialization_data: |
+ if end_byte: |
+ outer_total_size = end_byte + 1 |
+ elif serialization_data: |
outer_total_size = json.loads(serialization_data)['total_size'] |
if progress_callback: |
@@ -582,7 +655,7 @@ class GcsJsonApi(CloudApi): |
raise ArgumentException('Download size is required when callbacks are ' |
'requested for a download, but no size was ' |
'provided.') |
- progress_callback(0, outer_total_size) |
+ progress_callback(start_byte, outer_total_size) |
bytes_downloaded_container = BytesTransferredContainer() |
bytes_downloaded_container.bytes_transferred = start_byte |
@@ -592,10 +665,9 @@ class GcsJsonApi(CloudApi): |
progress_callback=progress_callback, digesters=digesters) |
download_http_class = callback_class_factory.GetConnectionClass() |
- download_http = self._GetNewDownloadHttp(download_stream) |
- download_http.connections = {'https': download_http_class} |
- authorized_download_http = self.credentials.authorize(download_http) |
- WrapDownloadHttpRequest(authorized_download_http) |
+ # Point our download HTTP at our download stream. |
+ self.download_http.stream = download_stream |
+ self.download_http.connections = {'https': download_http_class} |
if serialization_data: |
apitools_download = apitools_transfer.Download.FromData( |
@@ -606,7 +678,7 @@ class GcsJsonApi(CloudApi): |
download_stream, auto_transfer=False, total_size=object_size, |
num_retries=self.num_retries) |
- apitools_download.bytes_http = authorized_download_http |
+ apitools_download.bytes_http = self.authorized_download_http |
apitools_request = apitools_messages.StorageObjectsGetRequest( |
bucket=bucket_name, object=object_name, generation=generation) |
@@ -654,7 +726,7 @@ class GcsJsonApi(CloudApi): |
raise ResumableDownloadException( |
'Transfer failed after %d retries. Final exception: %s' % |
(self.num_retries, unicode(e).encode(UTF8))) |
- time.sleep(CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay())) |
+ time.sleep(CalculateWaitForRetry(retries, max_wait=self.max_retry_wait)) |
if self.logger.isEnabledFor(logging.DEBUG): |
self.logger.debug( |
'Retrying download from byte %s after exception: %s. Trace: %s', |
@@ -694,7 +766,7 @@ class GcsJsonApi(CloudApi): |
'accept-encoding': 'gzip', |
'user-agent': self.api_client.user_agent |
} |
- if start_byte or end_byte: |
+ if start_byte or end_byte is not None: |
apitools_download.GetRange(additional_headers=additional_headers, |
start=start_byte, end=end_byte, |
use_chunks=False) |
@@ -781,13 +853,10 @@ class GcsJsonApi(CloudApi): |
bytes_uploaded_container, total_size=total_size, |
progress_callback=progress_callback) |
- upload_http = self._GetNewUploadHttp() |
upload_http_class = callback_class_factory.GetConnectionClass() |
- upload_http.connections = {'http': upload_http_class, |
- 'https': upload_http_class} |
+ self.upload_http.connections = {'http': upload_http_class, |
+ 'https': upload_http_class} |
- authorized_upload_http = self.credentials.authorize(upload_http) |
- WrapUploadHttpRequest(authorized_upload_http) |
# Since bytes_http is created in this function, we don't get the |
# user-agent header from api_client's http automatically. |
additional_headers = { |
@@ -822,7 +891,7 @@ class GcsJsonApi(CloudApi): |
upload_stream, content_type, total_size=size, auto_transfer=True, |
num_retries=self.num_retries) |
apitools_upload.strategy = apitools_strategy |
- apitools_upload.bytes_http = authorized_upload_http |
+ apitools_upload.bytes_http = self.authorized_upload_http |
return self.api_client.objects.Insert( |
apitools_request, |
@@ -830,13 +899,16 @@ class GcsJsonApi(CloudApi): |
global_params=global_params) |
else: # Resumable upload. |
return self._PerformResumableUpload( |
- upload_stream, authorized_upload_http, content_type, size, |
+ upload_stream, self.authorized_upload_http, content_type, size, |
serialization_data, apitools_strategy, apitools_request, |
global_params, bytes_uploaded_container, tracker_callback, |
additional_headers, progress_callback) |
except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ not_found_exception = CreateNotFoundExceptionForObjectWrite( |
+ self.provider, object_metadata.bucket) |
self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket, |
- object_name=object_metadata.name) |
+ object_name=object_metadata.name, |
+ not_found_exception=not_found_exception) |
def _PerformResumableUpload( |
self, upload_stream, authorized_upload_http, content_type, size, |
@@ -929,7 +1001,7 @@ class GcsJsonApi(CloudApi): |
'Transfer failed after %d retries. Final exception: %s' % |
(self.num_retries, e2)) |
time.sleep( |
- CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay())) |
+ CalculateWaitForRetry(retries, max_wait=self.max_retry_wait)) |
if start_byte > last_progress_byte: |
# We've made progress, so allow a fresh set of retries. |
last_progress_byte = start_byte |
@@ -941,7 +1013,7 @@ class GcsJsonApi(CloudApi): |
'Transfer failed after %d retries. Final exception: %s' % |
(self.num_retries, unicode(e).encode(UTF8))) |
time.sleep( |
- CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay())) |
+ CalculateWaitForRetry(retries, max_wait=self.max_retry_wait)) |
if self.logger.isEnabledFor(logging.DEBUG): |
self.logger.debug( |
'Retrying upload from byte %s after exception: %s. Trace: %s', |
@@ -1069,8 +1141,13 @@ class GcsJsonApi(CloudApi): |
DeleteTrackerFile(tracker_file_name) |
return rewrite_response.resource |
except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ not_found_exception = CreateNotFoundExceptionForObjectWrite( |
+ self.provider, dst_obj_metadata.bucket, src_provider=self.provider, |
+ src_bucket_name=src_obj_metadata.bucket, |
+ src_object_name=src_obj_metadata.name, src_generation=src_generation) |
self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket, |
- object_name=dst_obj_metadata.name) |
+ object_name=dst_obj_metadata.name, |
+ not_found_exception=not_found_exception) |
def DeleteObject(self, bucket_name, object_name, preconditions=None, |
generation=None, provider=None): |
@@ -1210,7 +1287,7 @@ class GcsJsonApi(CloudApi): |
raise ArgumentException('Invalid canned ACL %s' % canned_acl_string) |
def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None, |
- generation=None): |
+ generation=None, not_found_exception=None): |
"""Translates an HTTP exception and raises the translated or original value. |
Args: |
@@ -1218,6 +1295,7 @@ class GcsJsonApi(CloudApi): |
bucket_name: Optional bucket name in request that caused the exception. |
object_name: Optional object name in request that caused the exception. |
generation: Optional generation in request that caused the exception. |
+ not_found_exception: Optional exception to raise in the not-found case. |
Raises: |
Translated CloudApi exception, or the original exception if it was not |
@@ -1225,7 +1303,7 @@ class GcsJsonApi(CloudApi): |
""" |
translated_exception = self._TranslateApitoolsException( |
e, bucket_name=bucket_name, object_name=object_name, |
- generation=generation) |
+ generation=generation, not_found_exception=not_found_exception) |
if translated_exception: |
raise translated_exception |
else: |
@@ -1242,8 +1320,7 @@ class GcsJsonApi(CloudApi): |
# If we couldn't decode anything, just leave the message as None. |
pass |
- def _TranslateApitoolsResumableUploadException( |
- self, e, bucket_name=None, object_name=None, generation=None): |
+ def _TranslateApitoolsResumableUploadException(self, e): |
if isinstance(e, apitools_exceptions.HttpError): |
message = self._GetMessageFromHttpError(e) |
if (e.status_code == 503 and |
@@ -1274,7 +1351,7 @@ class GcsJsonApi(CloudApi): |
return ResumableUploadAbortException(e.message) |
def _TranslateApitoolsException(self, e, bucket_name=None, object_name=None, |
- generation=None): |
+ generation=None, not_found_exception=None): |
"""Translates apitools exceptions into their gsutil Cloud Api equivalents. |
Args: |
@@ -1282,6 +1359,7 @@ class GcsJsonApi(CloudApi): |
bucket_name: Optional bucket name in request that caused the exception. |
object_name: Optional object name in request that caused the exception. |
generation: Optional generation in request that caused the exception. |
+ not_found_exception: Optional exception to raise in the not-found case. |
Returns: |
CloudStorageApiServiceException for translatable exceptions, None |
@@ -1333,7 +1411,12 @@ class GcsJsonApi(CloudApi): |
return AccessDeniedException(message or e.message, |
status=e.status_code) |
elif e.status_code == 404: |
- if bucket_name: |
+ if not_found_exception: |
+ # The exception is pre-constructed prior to translation; the HTTP |
+ # status code isn't available at that time. |
+ setattr(not_found_exception, 'status', e.status_code) |
+ return not_found_exception |
+ elif bucket_name: |
if object_name: |
return CreateObjectNotFoundException(e.status_code, self.provider, |
bucket_name, object_name, |
@@ -1341,6 +1424,7 @@ class GcsJsonApi(CloudApi): |
return CreateBucketNotFoundException(e.status_code, self.provider, |
bucket_name) |
return NotFoundException(e.message, status=e.status_code) |
+ |
elif e.status_code == 409 and bucket_name: |
if 'The bucket you tried to delete was not empty.' in str(e): |
return NotEmptyException('BucketNotEmpty (%s)' % bucket_name, |