| Index: third_party/gsutil/gslib/gcs_json_api.py
|
| diff --git a/third_party/gsutil/gslib/gcs_json_api.py b/third_party/gsutil/gslib/gcs_json_api.py
|
| index e57671d35da9d677e9aeb92f30c676193a509ca5..d16fae974c4af2c6f96c8fe3f56862244caf436a 100644
|
| --- a/third_party/gsutil/gslib/gcs_json_api.py
|
| +++ b/third_party/gsutil/gslib/gcs_json_api.py
|
| @@ -36,6 +36,7 @@ import boto
|
| from boto import config
|
| from gcs_oauth2_boto_plugin import oauth2_helper
|
| import httplib2
|
| +import oauth2client
|
| from oauth2client import devshell
|
| from oauth2client import multistore_file
|
|
|
| @@ -73,8 +74,10 @@ from gslib.tracker_file import HashRewriteParameters
|
| from gslib.tracker_file import ReadRewriteTrackerFile
|
| from gslib.tracker_file import WriteRewriteTrackerFile
|
| from gslib.translation_helper import CreateBucketNotFoundException
|
| +from gslib.translation_helper import CreateNotFoundExceptionForObjectWrite
|
| from gslib.translation_helper import CreateObjectNotFoundException
|
| from gslib.translation_helper import DEFAULT_CONTENT_TYPE
|
| +from gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL
|
| from gslib.translation_helper import REMOVE_CORS_CONFIG
|
| from gslib.util import GetBotoConfigFileList
|
| from gslib.util import GetCertsFile
|
| @@ -132,7 +135,7 @@ class GcsJsonApi(CloudApi):
|
| """Google Cloud Storage JSON implementation of gsutil Cloud API."""
|
|
|
| def __init__(self, bucket_storage_uri_class, logger, provider=None,
|
| - credentials=None, debug=0):
|
| + credentials=None, debug=0, trace_token=None):
|
| """Performs necessary setup for interacting with Google Cloud Storage.
|
|
|
| Args:
|
| @@ -142,6 +145,7 @@ class GcsJsonApi(CloudApi):
|
| credentials: Credentials to be used for interacting with Google Cloud
|
| Storage.
|
| debug: Debug level for the API implementation (0..3).
|
| + trace_token: Trace token to pass to the API implementation.
|
| """
|
| # TODO: Plumb host_header for perfdiag / test_perfdiag.
|
| # TODO: Add jitter to apitools' http_wrapper retry mechanism.
|
| @@ -163,6 +167,22 @@ class GcsJsonApi(CloudApi):
|
| self.certs_file = GetCertsFile()
|
|
|
| self.http = GetNewHttp()
|
| +
|
| + # Re-use download and upload connections. This class is only called
|
| + # sequentially, but we can share TCP warmed-up connections across calls.
|
| + self.download_http = self._GetNewDownloadHttp()
|
| + self.upload_http = self._GetNewUploadHttp()
|
| + if self.credentials:
|
| + self.authorized_download_http = self.credentials.authorize(
|
| + self.download_http)
|
| + self.authorized_upload_http = self.credentials.authorize(self.upload_http)
|
| + else:
|
| + self.authorized_download_http = self.download_http
|
| + self.authorized_upload_http = self.upload_http
|
| +
|
| + WrapDownloadHttpRequest(self.authorized_download_http)
|
| + WrapUploadHttpRequest(self.authorized_upload_http)
|
| +
|
| self.http_base = 'https://'
|
| gs_json_host = config.get('Credentials', 'gs_json_host', None)
|
| self.host_base = gs_json_host or 'www.googleapis.com'
|
| @@ -195,19 +215,27 @@ class GcsJsonApi(CloudApi):
|
| self.url_base = (self.http_base + self.host_base + self.host_port + '/' +
|
| 'storage/' + self.api_version + '/')
|
|
|
| + credential_store_key_dict = self._GetCredentialStoreKeyDict(
|
| + self.credentials)
|
| +
|
| self.credentials.set_store(
|
| - multistore_file.get_credential_storage_custom_string_key(
|
| - GetCredentialStoreFilename(), self.api_version))
|
| + multistore_file.get_credential_storage_custom_key(
|
| + GetCredentialStoreFilename(), credential_store_key_dict))
|
|
|
| self.num_retries = GetNumRetries()
|
| + self.max_retry_wait = GetMaxRetryDelay()
|
|
|
| log_request = (debug >= 3)
|
| log_response = (debug >= 3)
|
|
|
| + self.global_params = apitools_messages.StandardQueryParameters(
|
| + trace='token:%s' % trace_token) if trace_token else None
|
| +
|
| self.api_client = apitools_client.StorageV1(
|
| url=self.url_base, http=self.http, log_request=log_request,
|
| log_response=log_response, credentials=self.credentials,
|
| - version=self.api_version)
|
| + version=self.api_version, default_global_params=self.global_params)
|
| + self.api_client.max_retry_wait = self.max_retry_wait
|
| self.api_client.num_retries = self.num_retries
|
|
|
| if no_op_credentials:
|
| @@ -309,8 +337,40 @@ class GcsJsonApi(CloudApi):
|
| except:
|
| raise
|
|
|
| - def _GetNewDownloadHttp(self, download_stream):
|
| - return GetNewHttp(http_class=HttpWithDownloadStream, stream=download_stream)
|
| + def _GetCredentialStoreKeyDict(self, credentials):
|
| + """Disambiguates a credential for caching in a credential store.
|
| +
|
| + Different credential types have different fields that identify them.
|
| + This function assembles relevant information in a dict and returns it.
|
| +
|
| + Args:
|
| + credentials: An OAuth2Credentials object.
|
| +
|
| + Returns:
|
| + Dict of relevant identifiers for credentials.
|
| + """
|
| + # TODO: If scopes ever become available in the credentials themselves,
|
| + # include them in the key dict.
|
| + key_dict = {'api_version': self.api_version}
|
| + # pylint: disable=protected-access
|
| + if isinstance(credentials, devshell.DevshellCredentials):
|
| + key_dict['user_email'] = credentials.user_email
|
| + elif isinstance(credentials,
|
| + oauth2client.service_account._ServiceAccountCredentials):
|
| + key_dict['_service_account_email'] = credentials._service_account_email
|
| + elif isinstance(credentials,
|
| + oauth2client.client.SignedJwtAssertionCredentials):
|
| + key_dict['service_account_name'] = credentials.service_account_name
|
| + elif isinstance(credentials, oauth2client.client.OAuth2Credentials):
|
| + if credentials.client_id and credentials.client_id != 'null':
|
| + key_dict['client_id'] = credentials.client_id
|
| + key_dict['refresh_token'] = credentials.refresh_token
|
| + # pylint: enable=protected-access
|
| +
|
| + return key_dict
|
| +
|
| + def _GetNewDownloadHttp(self):
|
| + return GetNewHttp(http_class=HttpWithDownloadStream)
|
|
|
| def _GetNewUploadHttp(self):
|
| """Returns an upload-safe Http object (by disabling httplib2 retries)."""
|
| @@ -360,6 +420,11 @@ class GcsJsonApi(CloudApi):
|
| bucket_metadata.cors = []
|
| apitools_include_fields.append('cors')
|
|
|
| + if (bucket_metadata.defaultObjectAcl and
|
| + bucket_metadata.defaultObjectAcl[0] == PRIVATE_DEFAULT_OBJ_ACL):
|
| + bucket_metadata.defaultObjectAcl = []
|
| + apitools_include_fields.append('defaultObjectAcl')
|
| +
|
| predefined_acl = None
|
| if canned_acl:
|
| # Must null out existing ACLs to apply a canned ACL.
|
| @@ -528,14 +593,16 @@ class GcsJsonApi(CloudApi):
|
| yield object_or_prefix
|
|
|
| def _YieldObjectsAndPrefixes(self, object_list):
|
| - if object_list.items:
|
| - for cloud_obj in object_list.items:
|
| - yield CloudApi.CsObjectOrPrefix(cloud_obj,
|
| - CloudApi.CsObjectOrPrefixType.OBJECT)
|
| + # Yield prefixes first so that checking for the presence of a subdirectory
|
| + # is fast.
|
| if object_list.prefixes:
|
| for prefix in object_list.prefixes:
|
| yield CloudApi.CsObjectOrPrefix(prefix,
|
| CloudApi.CsObjectOrPrefixType.PREFIX)
|
| + if object_list.items:
|
| + for cloud_obj in object_list.items:
|
| + yield CloudApi.CsObjectOrPrefix(cloud_obj,
|
| + CloudApi.CsObjectOrPrefixType.OBJECT)
|
|
|
| def GetObjectMetadata(self, bucket_name, object_name, generation=None,
|
| provider=None, fields=None):
|
| @@ -573,8 +640,14 @@ class GcsJsonApi(CloudApi):
|
| if generation:
|
| generation = long(generation)
|
|
|
| + # 'outer_total_size' is only used for formatting user output, and is
|
| + # expected to be one higher than the last byte that should be downloaded.
|
| + # TODO: Change DownloadCallbackConnectionClassFactory and progress callbacks
|
| + # to more elegantly handle total size for components of files.
|
| outer_total_size = object_size
|
| - if serialization_data:
|
| + if end_byte:
|
| + outer_total_size = end_byte + 1
|
| + elif serialization_data:
|
| outer_total_size = json.loads(serialization_data)['total_size']
|
|
|
| if progress_callback:
|
| @@ -582,7 +655,7 @@ class GcsJsonApi(CloudApi):
|
| raise ArgumentException('Download size is required when callbacks are '
|
| 'requested for a download, but no size was '
|
| 'provided.')
|
| - progress_callback(0, outer_total_size)
|
| + progress_callback(start_byte, outer_total_size)
|
|
|
| bytes_downloaded_container = BytesTransferredContainer()
|
| bytes_downloaded_container.bytes_transferred = start_byte
|
| @@ -592,10 +665,9 @@ class GcsJsonApi(CloudApi):
|
| progress_callback=progress_callback, digesters=digesters)
|
| download_http_class = callback_class_factory.GetConnectionClass()
|
|
|
| - download_http = self._GetNewDownloadHttp(download_stream)
|
| - download_http.connections = {'https': download_http_class}
|
| - authorized_download_http = self.credentials.authorize(download_http)
|
| - WrapDownloadHttpRequest(authorized_download_http)
|
| + # Point our download HTTP at our download stream.
|
| + self.download_http.stream = download_stream
|
| + self.download_http.connections = {'https': download_http_class}
|
|
|
| if serialization_data:
|
| apitools_download = apitools_transfer.Download.FromData(
|
| @@ -606,7 +678,7 @@ class GcsJsonApi(CloudApi):
|
| download_stream, auto_transfer=False, total_size=object_size,
|
| num_retries=self.num_retries)
|
|
|
| - apitools_download.bytes_http = authorized_download_http
|
| + apitools_download.bytes_http = self.authorized_download_http
|
| apitools_request = apitools_messages.StorageObjectsGetRequest(
|
| bucket=bucket_name, object=object_name, generation=generation)
|
|
|
| @@ -654,7 +726,7 @@ class GcsJsonApi(CloudApi):
|
| raise ResumableDownloadException(
|
| 'Transfer failed after %d retries. Final exception: %s' %
|
| (self.num_retries, unicode(e).encode(UTF8)))
|
| - time.sleep(CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay()))
|
| + time.sleep(CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
|
| if self.logger.isEnabledFor(logging.DEBUG):
|
| self.logger.debug(
|
| 'Retrying download from byte %s after exception: %s. Trace: %s',
|
| @@ -694,7 +766,7 @@ class GcsJsonApi(CloudApi):
|
| 'accept-encoding': 'gzip',
|
| 'user-agent': self.api_client.user_agent
|
| }
|
| - if start_byte or end_byte:
|
| + if start_byte or end_byte is not None:
|
| apitools_download.GetRange(additional_headers=additional_headers,
|
| start=start_byte, end=end_byte,
|
| use_chunks=False)
|
| @@ -781,13 +853,10 @@ class GcsJsonApi(CloudApi):
|
| bytes_uploaded_container, total_size=total_size,
|
| progress_callback=progress_callback)
|
|
|
| - upload_http = self._GetNewUploadHttp()
|
| upload_http_class = callback_class_factory.GetConnectionClass()
|
| - upload_http.connections = {'http': upload_http_class,
|
| - 'https': upload_http_class}
|
| + self.upload_http.connections = {'http': upload_http_class,
|
| + 'https': upload_http_class}
|
|
|
| - authorized_upload_http = self.credentials.authorize(upload_http)
|
| - WrapUploadHttpRequest(authorized_upload_http)
|
| # Since bytes_http is created in this function, we don't get the
|
| # user-agent header from api_client's http automatically.
|
| additional_headers = {
|
| @@ -822,7 +891,7 @@ class GcsJsonApi(CloudApi):
|
| upload_stream, content_type, total_size=size, auto_transfer=True,
|
| num_retries=self.num_retries)
|
| apitools_upload.strategy = apitools_strategy
|
| - apitools_upload.bytes_http = authorized_upload_http
|
| + apitools_upload.bytes_http = self.authorized_upload_http
|
|
|
| return self.api_client.objects.Insert(
|
| apitools_request,
|
| @@ -830,13 +899,16 @@ class GcsJsonApi(CloudApi):
|
| global_params=global_params)
|
| else: # Resumable upload.
|
| return self._PerformResumableUpload(
|
| - upload_stream, authorized_upload_http, content_type, size,
|
| + upload_stream, self.authorized_upload_http, content_type, size,
|
| serialization_data, apitools_strategy, apitools_request,
|
| global_params, bytes_uploaded_container, tracker_callback,
|
| additional_headers, progress_callback)
|
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
|
| + not_found_exception = CreateNotFoundExceptionForObjectWrite(
|
| + self.provider, object_metadata.bucket)
|
| self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket,
|
| - object_name=object_metadata.name)
|
| + object_name=object_metadata.name,
|
| + not_found_exception=not_found_exception)
|
|
|
| def _PerformResumableUpload(
|
| self, upload_stream, authorized_upload_http, content_type, size,
|
| @@ -929,7 +1001,7 @@ class GcsJsonApi(CloudApi):
|
| 'Transfer failed after %d retries. Final exception: %s' %
|
| (self.num_retries, e2))
|
| time.sleep(
|
| - CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay()))
|
| + CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
|
| if start_byte > last_progress_byte:
|
| # We've made progress, so allow a fresh set of retries.
|
| last_progress_byte = start_byte
|
| @@ -941,7 +1013,7 @@ class GcsJsonApi(CloudApi):
|
| 'Transfer failed after %d retries. Final exception: %s' %
|
| (self.num_retries, unicode(e).encode(UTF8)))
|
| time.sleep(
|
| - CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay()))
|
| + CalculateWaitForRetry(retries, max_wait=self.max_retry_wait))
|
| if self.logger.isEnabledFor(logging.DEBUG):
|
| self.logger.debug(
|
| 'Retrying upload from byte %s after exception: %s. Trace: %s',
|
| @@ -1069,8 +1141,13 @@ class GcsJsonApi(CloudApi):
|
| DeleteTrackerFile(tracker_file_name)
|
| return rewrite_response.resource
|
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
|
| + not_found_exception = CreateNotFoundExceptionForObjectWrite(
|
| + self.provider, dst_obj_metadata.bucket, src_provider=self.provider,
|
| + src_bucket_name=src_obj_metadata.bucket,
|
| + src_object_name=src_obj_metadata.name, src_generation=src_generation)
|
| self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket,
|
| - object_name=dst_obj_metadata.name)
|
| + object_name=dst_obj_metadata.name,
|
| + not_found_exception=not_found_exception)
|
|
|
| def DeleteObject(self, bucket_name, object_name, preconditions=None,
|
| generation=None, provider=None):
|
| @@ -1210,7 +1287,7 @@ class GcsJsonApi(CloudApi):
|
| raise ArgumentException('Invalid canned ACL %s' % canned_acl_string)
|
|
|
| def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None,
|
| - generation=None):
|
| + generation=None, not_found_exception=None):
|
| """Translates an HTTP exception and raises the translated or original value.
|
|
|
| Args:
|
| @@ -1218,6 +1295,7 @@ class GcsJsonApi(CloudApi):
|
| bucket_name: Optional bucket name in request that caused the exception.
|
| object_name: Optional object name in request that caused the exception.
|
| generation: Optional generation in request that caused the exception.
|
| + not_found_exception: Optional exception to raise in the not-found case.
|
|
|
| Raises:
|
| Translated CloudApi exception, or the original exception if it was not
|
| @@ -1225,7 +1303,7 @@ class GcsJsonApi(CloudApi):
|
| """
|
| translated_exception = self._TranslateApitoolsException(
|
| e, bucket_name=bucket_name, object_name=object_name,
|
| - generation=generation)
|
| + generation=generation, not_found_exception=not_found_exception)
|
| if translated_exception:
|
| raise translated_exception
|
| else:
|
| @@ -1242,8 +1320,7 @@ class GcsJsonApi(CloudApi):
|
| # If we couldn't decode anything, just leave the message as None.
|
| pass
|
|
|
| - def _TranslateApitoolsResumableUploadException(
|
| - self, e, bucket_name=None, object_name=None, generation=None):
|
| + def _TranslateApitoolsResumableUploadException(self, e):
|
| if isinstance(e, apitools_exceptions.HttpError):
|
| message = self._GetMessageFromHttpError(e)
|
| if (e.status_code == 503 and
|
| @@ -1274,7 +1351,7 @@ class GcsJsonApi(CloudApi):
|
| return ResumableUploadAbortException(e.message)
|
|
|
| def _TranslateApitoolsException(self, e, bucket_name=None, object_name=None,
|
| - generation=None):
|
| + generation=None, not_found_exception=None):
|
| """Translates apitools exceptions into their gsutil Cloud Api equivalents.
|
|
|
| Args:
|
| @@ -1282,6 +1359,7 @@ class GcsJsonApi(CloudApi):
|
| bucket_name: Optional bucket name in request that caused the exception.
|
| object_name: Optional object name in request that caused the exception.
|
| generation: Optional generation in request that caused the exception.
|
| + not_found_exception: Optional exception to raise in the not-found case.
|
|
|
| Returns:
|
| CloudStorageApiServiceException for translatable exceptions, None
|
| @@ -1333,7 +1411,12 @@ class GcsJsonApi(CloudApi):
|
| return AccessDeniedException(message or e.message,
|
| status=e.status_code)
|
| elif e.status_code == 404:
|
| - if bucket_name:
|
| + if not_found_exception:
|
| + # The exception is pre-constructed prior to translation; the HTTP
|
| + # status code isn't available at that time.
|
| + setattr(not_found_exception, 'status', e.status_code)
|
| + return not_found_exception
|
| + elif bucket_name:
|
| if object_name:
|
| return CreateObjectNotFoundException(e.status_code, self.provider,
|
| bucket_name, object_name,
|
| @@ -1341,6 +1424,7 @@ class GcsJsonApi(CloudApi):
|
| return CreateBucketNotFoundException(e.status_code, self.provider,
|
| bucket_name)
|
| return NotFoundException(e.message, status=e.status_code)
|
| +
|
| elif e.status_code == 409 and bucket_name:
|
| if 'The bucket you tried to delete was not empty.' in str(e):
|
| return NotEmptyException('BucketNotEmpty (%s)' % bucket_name,
|
|
|