Index: third_party/gsutil/gslib/gcs_json_api.py |
diff --git a/third_party/gsutil/gslib/gcs_json_api.py b/third_party/gsutil/gslib/gcs_json_api.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..e57671d35da9d677e9aeb92f30c676193a509ca5 |
--- /dev/null |
+++ b/third_party/gsutil/gslib/gcs_json_api.py |
@@ -0,0 +1,1359 @@ |
+# -*- coding: utf-8 -*- |
+# Copyright 2014 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+"""JSON gsutil Cloud API implementation for Google Cloud Storage.""" |
+ |
+from __future__ import absolute_import |
+ |
+import httplib |
+import json |
+import logging |
+import os |
+import socket |
+import ssl |
+import time |
+import traceback |
+ |
+from apitools.base.py import credentials_lib |
+from apitools.base.py import encoding |
+from apitools.base.py import exceptions as apitools_exceptions |
+from apitools.base.py import http_wrapper as apitools_http_wrapper |
+from apitools.base.py import transfer as apitools_transfer |
+from apitools.base.py.util import CalculateWaitForRetry |
+ |
+import boto |
+from boto import config |
+from gcs_oauth2_boto_plugin import oauth2_helper |
+import httplib2 |
+from oauth2client import devshell |
+from oauth2client import multistore_file |
+ |
+from gslib.cloud_api import AccessDeniedException |
+from gslib.cloud_api import ArgumentException |
+from gslib.cloud_api import BadRequestException |
+from gslib.cloud_api import CloudApi |
+from gslib.cloud_api import NotEmptyException |
+from gslib.cloud_api import NotFoundException |
+from gslib.cloud_api import PreconditionException |
+from gslib.cloud_api import Preconditions |
+from gslib.cloud_api import ResumableDownloadException |
+from gslib.cloud_api import ResumableUploadAbortException |
+from gslib.cloud_api import ResumableUploadException |
+from gslib.cloud_api import ResumableUploadStartOverException |
+from gslib.cloud_api import ServiceException |
+from gslib.cloud_api_helper import ValidateDstObjectMetadata |
+from gslib.cred_types import CredTypes |
+from gslib.exception import CommandException |
+from gslib.gcs_json_media import BytesTransferredContainer |
+from gslib.gcs_json_media import DownloadCallbackConnectionClassFactory |
+from gslib.gcs_json_media import HttpWithDownloadStream |
+from gslib.gcs_json_media import HttpWithNoRetries |
+from gslib.gcs_json_media import UploadCallbackConnectionClassFactory |
+from gslib.gcs_json_media import WrapDownloadHttpRequest |
+from gslib.gcs_json_media import WrapUploadHttpRequest |
+from gslib.no_op_credentials import NoOpCredentials |
+from gslib.progress_callback import ProgressCallbackWithBackoff |
+from gslib.project_id import PopulateProjectId |
+from gslib.third_party.storage_apitools import storage_v1_client as apitools_client |
+from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages |
+from gslib.tracker_file import DeleteTrackerFile |
+from gslib.tracker_file import GetRewriteTrackerFilePath |
+from gslib.tracker_file import HashRewriteParameters |
+from gslib.tracker_file import ReadRewriteTrackerFile |
+from gslib.tracker_file import WriteRewriteTrackerFile |
+from gslib.translation_helper import CreateBucketNotFoundException |
+from gslib.translation_helper import CreateObjectNotFoundException |
+from gslib.translation_helper import DEFAULT_CONTENT_TYPE |
+from gslib.translation_helper import REMOVE_CORS_CONFIG |
+from gslib.util import GetBotoConfigFileList |
+from gslib.util import GetCertsFile |
+from gslib.util import GetCredentialStoreFilename |
+from gslib.util import GetGceCredentialCacheFilename |
+from gslib.util import GetJsonResumableChunkSize |
+from gslib.util import GetMaxRetryDelay |
+from gslib.util import GetNewHttp |
+from gslib.util import GetNumRetries |
+from gslib.util import UTF8 |
+ |
+ |
+# Implementation supports only 'gs' URLs, so provider is unused. |
+# pylint: disable=unused-argument |
+ |
+DEFAULT_GCS_JSON_VERSION = 'v1' |
+ |
+NUM_BUCKETS_PER_LIST_PAGE = 1000 |
+NUM_OBJECTS_PER_LIST_PAGE = 1000 |
+ |
+TRANSLATABLE_APITOOLS_EXCEPTIONS = (apitools_exceptions.HttpError, |
+ apitools_exceptions.StreamExhausted, |
+ apitools_exceptions.TransferError, |
+ apitools_exceptions.TransferInvalidError) |
+ |
+# TODO: Distribute these exceptions better through apitools and here. |
+# Right now, apitools is configured not to handle any exceptions on |
+# uploads/downloads. |
+# oauth2_client tries to JSON-decode the response, which can result |
+# in a ValueError if the response was invalid. Until that is fixed in |
+# oauth2_client, need to handle it here. |
+HTTP_TRANSFER_EXCEPTIONS = (apitools_exceptions.TransferRetryError, |
+ apitools_exceptions.BadStatusCodeError, |
+ # TODO: Honor retry-after headers. |
+ apitools_exceptions.RetryAfterError, |
+ apitools_exceptions.RequestError, |
+ httplib.BadStatusLine, |
+ httplib.IncompleteRead, |
+ httplib.ResponseNotReady, |
+ httplib2.ServerNotFoundError, |
+ socket.error, |
+ socket.gaierror, |
+ socket.timeout, |
+ ssl.SSLError, |
+ ValueError) |
+ |
+_VALIDATE_CERTIFICATES_503_MESSAGE = ( |
+ """Service Unavailable. If you have recently changed |
+ https_validate_certificates from True to False in your boto configuration |
+ file, please delete any cached access tokens in your filesystem (at %s) |
+ and try again.""" % GetCredentialStoreFilename()) |
+ |
+ |
+class GcsJsonApi(CloudApi): |
+ """Google Cloud Storage JSON implementation of gsutil Cloud API.""" |
+ |
+ def __init__(self, bucket_storage_uri_class, logger, provider=None, |
+ credentials=None, debug=0): |
+ """Performs necessary setup for interacting with Google Cloud Storage. |
+ |
+ Args: |
+ bucket_storage_uri_class: Unused. |
+ logger: logging.logger for outputting log messages. |
+ provider: Unused. This implementation supports only Google Cloud Storage. |
+ credentials: Credentials to be used for interacting with Google Cloud |
+ Storage. |
+ debug: Debug level for the API implementation (0..3). |
+ """ |
+ # TODO: Plumb host_header for perfdiag / test_perfdiag. |
+ # TODO: Add jitter to apitools' http_wrapper retry mechanism. |
+ super(GcsJsonApi, self).__init__(bucket_storage_uri_class, logger, |
+ provider='gs', debug=debug) |
+ no_op_credentials = False |
+ if not credentials: |
+ loaded_credentials = self._CheckAndGetCredentials(logger) |
+ |
+ if not loaded_credentials: |
+ loaded_credentials = NoOpCredentials() |
+ no_op_credentials = True |
+ else: |
+ if isinstance(credentials, NoOpCredentials): |
+ no_op_credentials = True |
+ |
+ self.credentials = credentials or loaded_credentials |
+ |
+ self.certs_file = GetCertsFile() |
+ |
+ self.http = GetNewHttp() |
+ self.http_base = 'https://' |
+ gs_json_host = config.get('Credentials', 'gs_json_host', None) |
+ self.host_base = gs_json_host or 'www.googleapis.com' |
+ |
+ if not gs_json_host: |
+ gs_host = config.get('Credentials', 'gs_host', None) |
+ if gs_host: |
+ raise ArgumentException( |
+ 'JSON API is selected but gs_json_host is not configured, ' |
+ 'while gs_host is configured to %s. Please also configure ' |
+ 'gs_json_host and gs_json_port to match your desired endpoint.' |
+ % gs_host) |
+ |
+ gs_json_port = config.get('Credentials', 'gs_json_port', None) |
+ |
+ if not gs_json_port: |
+ gs_port = config.get('Credentials', 'gs_port', None) |
+ if gs_port: |
+ raise ArgumentException( |
+ 'JSON API is selected but gs_json_port is not configured, ' |
+ 'while gs_port is configured to %s. Please also configure ' |
+ 'gs_json_host and gs_json_port to match your desired endpoint.' |
+ % gs_port) |
+ self.host_port = '' |
+ else: |
+ self.host_port = ':' + config.get('Credentials', 'gs_json_port') |
+ |
+ self.api_version = config.get('GSUtil', 'json_api_version', |
+ DEFAULT_GCS_JSON_VERSION) |
+ self.url_base = (self.http_base + self.host_base + self.host_port + '/' + |
+ 'storage/' + self.api_version + '/') |
+ |
+ self.credentials.set_store( |
+ multistore_file.get_credential_storage_custom_string_key( |
+ GetCredentialStoreFilename(), self.api_version)) |
+ |
+ self.num_retries = GetNumRetries() |
+ |
+ log_request = (debug >= 3) |
+ log_response = (debug >= 3) |
+ |
+ self.api_client = apitools_client.StorageV1( |
+ url=self.url_base, http=self.http, log_request=log_request, |
+ log_response=log_response, credentials=self.credentials, |
+ version=self.api_version) |
+ self.api_client.num_retries = self.num_retries |
+ |
+ if no_op_credentials: |
+ # This API key is not secret and is used to identify gsutil during |
+ # anonymous requests. |
+ self.api_client.AddGlobalParam('key', |
+ u'AIzaSyDnacJHrKma0048b13sh8cgxNUwulubmJM') |
+ |
+ def _CheckAndGetCredentials(self, logger): |
+ configured_cred_types = [] |
+ try: |
+ if self._HasOauth2UserAccountCreds(): |
+ configured_cred_types.append(CredTypes.OAUTH2_USER_ACCOUNT) |
+ if self._HasOauth2ServiceAccountCreds(): |
+ configured_cred_types.append(CredTypes.OAUTH2_SERVICE_ACCOUNT) |
+ if len(configured_cred_types) > 1: |
+ # We only allow one set of configured credentials. Otherwise, we're |
+ # choosing one arbitrarily, which can be very confusing to the user |
+ # (e.g., if only one is authorized to perform some action) and can |
+ # also mask errors. |
+ # Because boto merges config files, GCE credentials show up by default |
+ # for GCE VMs. We don't want to fail when a user creates a boto file |
+ # with their own credentials, so in this case we'll use the OAuth2 |
+ # user credentials. |
+ failed_cred_type = None |
+ raise CommandException( |
+ ('You have multiple types of configured credentials (%s), which is ' |
+ 'not supported. One common way this happens is if you run gsutil ' |
+ 'config to create credentials and later run gcloud auth, and ' |
+ 'create a second set of credentials. Your boto config path is: ' |
+ '%s. For more help, see "gsutil help creds".') |
+ % (configured_cred_types, GetBotoConfigFileList())) |
+ |
+ failed_cred_type = CredTypes.OAUTH2_USER_ACCOUNT |
+ user_creds = self._GetOauth2UserAccountCreds() |
+ failed_cred_type = CredTypes.OAUTH2_SERVICE_ACCOUNT |
+ service_account_creds = self._GetOauth2ServiceAccountCreds() |
+ failed_cred_type = CredTypes.GCE |
+ gce_creds = self._GetGceCreds() |
+ failed_cred_type = CredTypes.DEVSHELL |
+ devshell_creds = self._GetDevshellCreds() |
+ return user_creds or service_account_creds or gce_creds or devshell_creds |
+ except: # pylint: disable=bare-except |
+ |
+ # If we didn't actually try to authenticate because there were multiple |
+ # types of configured credentials, don't emit this warning. |
+ if failed_cred_type: |
+ if os.environ.get('CLOUDSDK_WRAPPER') == '1': |
+ logger.warn( |
+ 'Your "%s" credentials are invalid. Please run\n' |
+ ' $ gcloud auth login', failed_cred_type) |
+ else: |
+ logger.warn( |
+ 'Your "%s" credentials are invalid. For more help, see ' |
+ '"gsutil help creds", or re-run the gsutil config command (see ' |
+ '"gsutil help config").', failed_cred_type) |
+ |
+ # If there's any set of configured credentials, we'll fail if they're |
+ # invalid, rather than silently falling back to anonymous config (as |
+ # boto does). That approach leads to much confusion if users don't |
+ # realize their credentials are invalid. |
+ raise |
+ |
+ def _HasOauth2ServiceAccountCreds(self): |
+ return config.has_option('Credentials', 'gs_service_key_file') |
+ |
+ def _HasOauth2UserAccountCreds(self): |
+ return config.has_option('Credentials', 'gs_oauth2_refresh_token') |
+ |
+ def _HasGceCreds(self): |
+ return config.has_option('GoogleCompute', 'service_account') |
+ |
+ def _GetOauth2ServiceAccountCreds(self): |
+ if self._HasOauth2ServiceAccountCreds(): |
+ return oauth2_helper.OAuth2ClientFromBotoConfig( |
+ boto.config, |
+ cred_type=CredTypes.OAUTH2_SERVICE_ACCOUNT).GetCredentials() |
+ |
+ def _GetOauth2UserAccountCreds(self): |
+ if self._HasOauth2UserAccountCreds(): |
+ return oauth2_helper.OAuth2ClientFromBotoConfig( |
+ boto.config).GetCredentials() |
+ |
+ def _GetGceCreds(self): |
+ if self._HasGceCreds(): |
+ try: |
+ return credentials_lib.GceAssertionCredentials( |
+ cache_filename=GetGceCredentialCacheFilename()) |
+ except apitools_exceptions.ResourceUnavailableError, e: |
+ if 'service account' in str(e) and 'does not exist' in str(e): |
+ return None |
+ raise |
+ |
+ def _GetDevshellCreds(self): |
+ try: |
+ return devshell.DevshellCredentials() |
+ except devshell.NoDevshellServer: |
+ return None |
+ except: |
+ raise |
+ |
+ def _GetNewDownloadHttp(self, download_stream): |
+ return GetNewHttp(http_class=HttpWithDownloadStream, stream=download_stream) |
+ |
+ def _GetNewUploadHttp(self): |
+ """Returns an upload-safe Http object (by disabling httplib2 retries).""" |
+ return GetNewHttp(http_class=HttpWithNoRetries) |
+ |
+ def GetBucket(self, bucket_name, provider=None, fields=None): |
+ """See CloudApi class for function doc strings.""" |
+ projection = (apitools_messages.StorageBucketsGetRequest |
+ .ProjectionValueValuesEnum.full) |
+ apitools_request = apitools_messages.StorageBucketsGetRequest( |
+ bucket=bucket_name, projection=projection) |
+ global_params = apitools_messages.StandardQueryParameters() |
+ if fields: |
+ global_params.fields = ','.join(set(fields)) |
+ |
+ # Here and in list buckets, we have no way of knowing |
+ # whether we requested a field and didn't get it because it didn't exist |
+ # or because we didn't have permission to access it. |
+ try: |
+ return self.api_client.buckets.Get(apitools_request, |
+ global_params=global_params) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
+ |
+ def PatchBucket(self, bucket_name, metadata, canned_acl=None, |
+ canned_def_acl=None, preconditions=None, provider=None, |
+ fields=None): |
+ """See CloudApi class for function doc strings.""" |
+ projection = (apitools_messages.StorageBucketsPatchRequest |
+ .ProjectionValueValuesEnum.full) |
+ bucket_metadata = metadata |
+ |
+ if not preconditions: |
+ preconditions = Preconditions() |
+ |
+ # For blank metadata objects, we need to explicitly call |
+ # them out to apitools so it will send/erase them. |
+ apitools_include_fields = [] |
+ for metadata_field in ('metadata', 'lifecycle', 'logging', 'versioning', |
+ 'website'): |
+ attr = getattr(bucket_metadata, metadata_field, None) |
+ if attr and not encoding.MessageToDict(attr): |
+ setattr(bucket_metadata, metadata_field, None) |
+ apitools_include_fields.append(metadata_field) |
+ |
+ if bucket_metadata.cors and bucket_metadata.cors == REMOVE_CORS_CONFIG: |
+ bucket_metadata.cors = [] |
+ apitools_include_fields.append('cors') |
+ |
+ predefined_acl = None |
+ if canned_acl: |
+ # Must null out existing ACLs to apply a canned ACL. |
+ apitools_include_fields.append('acl') |
+ predefined_acl = ( |
+ apitools_messages.StorageBucketsPatchRequest. |
+ PredefinedAclValueValuesEnum( |
+ self._BucketCannedAclToPredefinedAcl(canned_acl))) |
+ |
+ predefined_def_acl = None |
+ if canned_def_acl: |
+ # Must null out existing default object ACLs to apply a canned ACL. |
+ apitools_include_fields.append('defaultObjectAcl') |
+ predefined_def_acl = ( |
+ apitools_messages.StorageBucketsPatchRequest. |
+ PredefinedDefaultObjectAclValueValuesEnum( |
+ self._ObjectCannedAclToPredefinedAcl(canned_def_acl))) |
+ |
+ apitools_request = apitools_messages.StorageBucketsPatchRequest( |
+ bucket=bucket_name, bucketResource=bucket_metadata, |
+ projection=projection, |
+ ifMetagenerationMatch=preconditions.meta_gen_match, |
+ predefinedAcl=predefined_acl, |
+ predefinedDefaultObjectAcl=predefined_def_acl) |
+ global_params = apitools_messages.StandardQueryParameters() |
+ if fields: |
+ global_params.fields = ','.join(set(fields)) |
+ with self.api_client.IncludeFields(apitools_include_fields): |
+ try: |
+ return self.api_client.buckets.Patch(apitools_request, |
+ global_params=global_params) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e) |
+ |
+ def CreateBucket(self, bucket_name, project_id=None, metadata=None, |
+ provider=None, fields=None): |
+ """See CloudApi class for function doc strings.""" |
+ projection = (apitools_messages.StorageBucketsInsertRequest |
+ .ProjectionValueValuesEnum.full) |
+ if not metadata: |
+ metadata = apitools_messages.Bucket() |
+ metadata.name = bucket_name |
+ |
+ if metadata.location: |
+ metadata.location = metadata.location.upper() |
+ if metadata.storageClass: |
+ metadata.storageClass = metadata.storageClass.upper() |
+ |
+ project_id = PopulateProjectId(project_id) |
+ |
+ apitools_request = apitools_messages.StorageBucketsInsertRequest( |
+ bucket=metadata, project=project_id, projection=projection) |
+ global_params = apitools_messages.StandardQueryParameters() |
+ if fields: |
+ global_params.fields = ','.join(set(fields)) |
+ try: |
+ return self.api_client.buckets.Insert(apitools_request, |
+ global_params=global_params) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
+ |
+ def DeleteBucket(self, bucket_name, preconditions=None, provider=None): |
+ """See CloudApi class for function doc strings.""" |
+ if not preconditions: |
+ preconditions = Preconditions() |
+ |
+ apitools_request = apitools_messages.StorageBucketsDeleteRequest( |
+ bucket=bucket_name, ifMetagenerationMatch=preconditions.meta_gen_match) |
+ |
+ try: |
+ self.api_client.buckets.Delete(apitools_request) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ if isinstance( |
+ self._TranslateApitoolsException(e, bucket_name=bucket_name), |
+ NotEmptyException): |
+ # If bucket is not empty, check to see if versioning is enabled and |
+ # signal that in the exception if it is. |
+ bucket_metadata = self.GetBucket(bucket_name, |
+ fields=['versioning']) |
+ if bucket_metadata.versioning and bucket_metadata.versioning.enabled: |
+ raise NotEmptyException('VersionedBucketNotEmpty', |
+ status=e.status_code) |
+ self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
+ |
+ def ListBuckets(self, project_id=None, provider=None, fields=None): |
+ """See CloudApi class for function doc strings.""" |
+ projection = (apitools_messages.StorageBucketsListRequest |
+ .ProjectionValueValuesEnum.full) |
+ project_id = PopulateProjectId(project_id) |
+ |
+ apitools_request = apitools_messages.StorageBucketsListRequest( |
+ project=project_id, maxResults=NUM_BUCKETS_PER_LIST_PAGE, |
+ projection=projection) |
+ global_params = apitools_messages.StandardQueryParameters() |
+ if fields: |
+ if 'nextPageToken' not in fields: |
+ fields.add('nextPageToken') |
+ global_params.fields = ','.join(set(fields)) |
+ try: |
+ bucket_list = self.api_client.buckets.List(apitools_request, |
+ global_params=global_params) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e) |
+ |
+ for bucket in self._YieldBuckets(bucket_list): |
+ yield bucket |
+ |
+ while bucket_list.nextPageToken: |
+ apitools_request = apitools_messages.StorageBucketsListRequest( |
+ project=project_id, pageToken=bucket_list.nextPageToken, |
+ maxResults=NUM_BUCKETS_PER_LIST_PAGE, projection=projection) |
+ try: |
+ bucket_list = self.api_client.buckets.List(apitools_request, |
+ global_params=global_params) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e) |
+ |
+ for bucket in self._YieldBuckets(bucket_list): |
+ yield bucket |
+ |
+ def _YieldBuckets(self, bucket_list): |
+ """Yields buckets from a list returned by apitools.""" |
+ if bucket_list.items: |
+ for bucket in bucket_list.items: |
+ yield bucket |
+ |
+ def ListObjects(self, bucket_name, prefix=None, delimiter=None, |
+ all_versions=None, provider=None, fields=None): |
+ """See CloudApi class for function doc strings.""" |
+ projection = (apitools_messages.StorageObjectsListRequest |
+ .ProjectionValueValuesEnum.full) |
+ apitools_request = apitools_messages.StorageObjectsListRequest( |
+ bucket=bucket_name, prefix=prefix, delimiter=delimiter, |
+ versions=all_versions, projection=projection, |
+ maxResults=NUM_OBJECTS_PER_LIST_PAGE) |
+ global_params = apitools_messages.StandardQueryParameters() |
+ |
+ if fields: |
+ fields = set(fields) |
+ if 'nextPageToken' not in fields: |
+ fields.add('nextPageToken') |
+ global_params.fields = ','.join(fields) |
+ |
+ try: |
+ object_list = self.api_client.objects.List(apitools_request, |
+ global_params=global_params) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
+ |
+ for object_or_prefix in self._YieldObjectsAndPrefixes(object_list): |
+ yield object_or_prefix |
+ |
+ while object_list.nextPageToken: |
+ apitools_request = apitools_messages.StorageObjectsListRequest( |
+ bucket=bucket_name, prefix=prefix, delimiter=delimiter, |
+ versions=all_versions, projection=projection, |
+ pageToken=object_list.nextPageToken, |
+ maxResults=NUM_OBJECTS_PER_LIST_PAGE) |
+ try: |
+ object_list = self.api_client.objects.List(apitools_request, |
+ global_params=global_params) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
+ |
+ for object_or_prefix in self._YieldObjectsAndPrefixes(object_list): |
+ yield object_or_prefix |
+ |
+ def _YieldObjectsAndPrefixes(self, object_list): |
+ if object_list.items: |
+ for cloud_obj in object_list.items: |
+ yield CloudApi.CsObjectOrPrefix(cloud_obj, |
+ CloudApi.CsObjectOrPrefixType.OBJECT) |
+ if object_list.prefixes: |
+ for prefix in object_list.prefixes: |
+ yield CloudApi.CsObjectOrPrefix(prefix, |
+ CloudApi.CsObjectOrPrefixType.PREFIX) |
+ |
+ def GetObjectMetadata(self, bucket_name, object_name, generation=None, |
+ provider=None, fields=None): |
+ """See CloudApi class for function doc strings.""" |
+ projection = (apitools_messages.StorageObjectsGetRequest |
+ .ProjectionValueValuesEnum.full) |
+ |
+ if generation: |
+ generation = long(generation) |
+ |
+ apitools_request = apitools_messages.StorageObjectsGetRequest( |
+ bucket=bucket_name, object=object_name, projection=projection, |
+ generation=generation) |
+ global_params = apitools_messages.StandardQueryParameters() |
+ if fields: |
+ global_params.fields = ','.join(set(fields)) |
+ |
+ try: |
+ return self.api_client.objects.Get(apitools_request, |
+ global_params=global_params) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
+ object_name=object_name, |
+ generation=generation) |
+ |
+ def GetObjectMedia( |
+ self, bucket_name, object_name, download_stream, |
+ provider=None, generation=None, object_size=None, |
+ download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, start_byte=0, |
+ end_byte=None, progress_callback=None, serialization_data=None, |
+ digesters=None): |
+ """See CloudApi class for function doc strings.""" |
+ # This implementation will get the object metadata first if we don't pass it |
+ # in via serialization_data. |
+ if generation: |
+ generation = long(generation) |
+ |
+ outer_total_size = object_size |
+ if serialization_data: |
+ outer_total_size = json.loads(serialization_data)['total_size'] |
+ |
+ if progress_callback: |
+ if outer_total_size is None: |
+ raise ArgumentException('Download size is required when callbacks are ' |
+ 'requested for a download, but no size was ' |
+ 'provided.') |
+ progress_callback(0, outer_total_size) |
+ |
+ bytes_downloaded_container = BytesTransferredContainer() |
+ bytes_downloaded_container.bytes_transferred = start_byte |
+ |
+ callback_class_factory = DownloadCallbackConnectionClassFactory( |
+ bytes_downloaded_container, total_size=outer_total_size, |
+ progress_callback=progress_callback, digesters=digesters) |
+ download_http_class = callback_class_factory.GetConnectionClass() |
+ |
+ download_http = self._GetNewDownloadHttp(download_stream) |
+ download_http.connections = {'https': download_http_class} |
+ authorized_download_http = self.credentials.authorize(download_http) |
+ WrapDownloadHttpRequest(authorized_download_http) |
+ |
+ if serialization_data: |
+ apitools_download = apitools_transfer.Download.FromData( |
+ download_stream, serialization_data, self.api_client.http, |
+ num_retries=self.num_retries) |
+ else: |
+ apitools_download = apitools_transfer.Download.FromStream( |
+ download_stream, auto_transfer=False, total_size=object_size, |
+ num_retries=self.num_retries) |
+ |
+ apitools_download.bytes_http = authorized_download_http |
+ apitools_request = apitools_messages.StorageObjectsGetRequest( |
+ bucket=bucket_name, object=object_name, generation=generation) |
+ |
+ try: |
+ if download_strategy == CloudApi.DownloadStrategy.RESUMABLE: |
+ # Disable retries in apitools. We will handle them explicitly here. |
+ apitools_download.retry_func = ( |
+ apitools_http_wrapper.RethrowExceptionHandler) |
+ return self._PerformResumableDownload( |
+ bucket_name, object_name, download_stream, apitools_request, |
+ apitools_download, bytes_downloaded_container, |
+ generation=generation, start_byte=start_byte, end_byte=end_byte, |
+ serialization_data=serialization_data) |
+ else: |
+ return self._PerformDownload( |
+ bucket_name, object_name, download_stream, apitools_request, |
+ apitools_download, generation=generation, start_byte=start_byte, |
+ end_byte=end_byte, serialization_data=serialization_data) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
+ object_name=object_name, |
+ generation=generation) |
+ |
+ def _PerformResumableDownload( |
+ self, bucket_name, object_name, download_stream, apitools_request, |
+ apitools_download, bytes_downloaded_container, generation=None, |
+ start_byte=0, end_byte=None, serialization_data=None): |
+ retries = 0 |
+ last_progress_byte = start_byte |
+ while retries <= self.num_retries: |
+ try: |
+ return self._PerformDownload( |
+ bucket_name, object_name, download_stream, apitools_request, |
+ apitools_download, generation=generation, start_byte=start_byte, |
+ end_byte=end_byte, serialization_data=serialization_data) |
+ except HTTP_TRANSFER_EXCEPTIONS, e: |
+ start_byte = download_stream.tell() |
+ bytes_downloaded_container.bytes_transferred = start_byte |
+ if start_byte > last_progress_byte: |
+ # We've made progress, so allow a fresh set of retries. |
+ last_progress_byte = start_byte |
+ retries = 0 |
+ retries += 1 |
+ if retries > self.num_retries: |
+ raise ResumableDownloadException( |
+ 'Transfer failed after %d retries. Final exception: %s' % |
+ (self.num_retries, unicode(e).encode(UTF8))) |
+ time.sleep(CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay())) |
+ if self.logger.isEnabledFor(logging.DEBUG): |
+ self.logger.debug( |
+ 'Retrying download from byte %s after exception: %s. Trace: %s', |
+ start_byte, unicode(e).encode(UTF8), traceback.format_exc()) |
+ apitools_http_wrapper.RebuildHttpConnections( |
+ apitools_download.bytes_http) |
+ |
+ def _PerformDownload( |
+ self, bucket_name, object_name, download_stream, apitools_request, |
+ apitools_download, generation=None, start_byte=0, end_byte=None, |
+ serialization_data=None): |
+ if not serialization_data: |
+ try: |
+ self.api_client.objects.Get(apitools_request, |
+ download=apitools_download) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
+ object_name=object_name, |
+ generation=generation) |
+ |
+ # Disable apitools' default print callbacks. |
+ def _NoOpCallback(unused_response, unused_download_object): |
+ pass |
+ |
+ # TODO: If we have a resumable download with accept-encoding:gzip |
+ # on a object that is compressible but not in gzip form in the cloud, |
+ # on-the-fly compression will gzip the object. In this case if our |
+ # download breaks, future requests will ignore the range header and just |
+ # return the object (gzipped) in its entirety. Ideally, we would unzip |
+ # the bytes that we have locally and send a range request without |
+ # accept-encoding:gzip so that we can download only the (uncompressed) bytes |
+ # that we don't yet have. |
+ |
+ # Since bytes_http is created in this function, we don't get the |
+ # user-agent header from api_client's http automatically. |
+ additional_headers = { |
+ 'accept-encoding': 'gzip', |
+ 'user-agent': self.api_client.user_agent |
+ } |
+ if start_byte or end_byte: |
+ apitools_download.GetRange(additional_headers=additional_headers, |
+ start=start_byte, end=end_byte, |
+ use_chunks=False) |
+ else: |
+ apitools_download.StreamMedia( |
+ callback=_NoOpCallback, finish_callback=_NoOpCallback, |
+ additional_headers=additional_headers, use_chunks=False) |
+ return apitools_download.encoding |
+ |
+ def PatchObjectMetadata(self, bucket_name, object_name, metadata, |
+ canned_acl=None, generation=None, preconditions=None, |
+ provider=None, fields=None): |
+ """See CloudApi class for function doc strings.""" |
+ projection = (apitools_messages.StorageObjectsPatchRequest |
+ .ProjectionValueValuesEnum.full) |
+ |
+ if not preconditions: |
+ preconditions = Preconditions() |
+ |
+ if generation: |
+ generation = long(generation) |
+ |
+ predefined_acl = None |
+ apitools_include_fields = [] |
+ if canned_acl: |
+ # Must null out existing ACLs to apply a canned ACL. |
+ apitools_include_fields.append('acl') |
+ predefined_acl = ( |
+ apitools_messages.StorageObjectsPatchRequest. |
+ PredefinedAclValueValuesEnum( |
+ self._ObjectCannedAclToPredefinedAcl(canned_acl))) |
+ |
+ apitools_request = apitools_messages.StorageObjectsPatchRequest( |
+ bucket=bucket_name, object=object_name, objectResource=metadata, |
+ generation=generation, projection=projection, |
+ ifGenerationMatch=preconditions.gen_match, |
+ ifMetagenerationMatch=preconditions.meta_gen_match, |
+ predefinedAcl=predefined_acl) |
+ global_params = apitools_messages.StandardQueryParameters() |
+ if fields: |
+ global_params.fields = ','.join(set(fields)) |
+ |
+ try: |
+ with self.api_client.IncludeFields(apitools_include_fields): |
+ return self.api_client.objects.Patch(apitools_request, |
+ global_params=global_params) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
+ object_name=object_name, |
+ generation=generation) |
+ |
+ def _UploadObject(self, upload_stream, object_metadata, canned_acl=None, |
+ size=None, preconditions=None, provider=None, fields=None, |
+ serialization_data=None, tracker_callback=None, |
+ progress_callback=None, |
+ apitools_strategy=apitools_transfer.SIMPLE_UPLOAD, |
+ total_size=0): |
+ # pylint: disable=g-doc-args |
+ """Upload implementation. Cloud API arguments, plus two more. |
+ |
+ Additional args: |
+ apitools_strategy: SIMPLE_UPLOAD or RESUMABLE_UPLOAD. |
+ total_size: Total size of the upload; None if it is unknown (streaming). |
+ |
+ Returns: |
+ Uploaded object metadata. |
+ """ |
+ # pylint: enable=g-doc-args |
+ ValidateDstObjectMetadata(object_metadata) |
+ predefined_acl = None |
+ if canned_acl: |
+ predefined_acl = ( |
+ apitools_messages.StorageObjectsInsertRequest. |
+ PredefinedAclValueValuesEnum( |
+ self._ObjectCannedAclToPredefinedAcl(canned_acl))) |
+ |
+ bytes_uploaded_container = BytesTransferredContainer() |
+ |
+ if progress_callback and size: |
+ total_size = size |
+ progress_callback(0, size) |
+ |
+ callback_class_factory = UploadCallbackConnectionClassFactory( |
+ bytes_uploaded_container, total_size=total_size, |
+ progress_callback=progress_callback) |
+ |
+ upload_http = self._GetNewUploadHttp() |
+ upload_http_class = callback_class_factory.GetConnectionClass() |
+ upload_http.connections = {'http': upload_http_class, |
+ 'https': upload_http_class} |
+ |
+ authorized_upload_http = self.credentials.authorize(upload_http) |
+ WrapUploadHttpRequest(authorized_upload_http) |
+ # Since bytes_http is created in this function, we don't get the |
+ # user-agent header from api_client's http automatically. |
+ additional_headers = { |
+ 'user-agent': self.api_client.user_agent |
+ } |
+ |
+ try: |
+ content_type = None |
+ apitools_request = None |
+ global_params = None |
+ if not serialization_data: |
+ # This is a new upload, set up initial upload state. |
+ content_type = object_metadata.contentType |
+ if not content_type: |
+ content_type = DEFAULT_CONTENT_TYPE |
+ |
+ if not preconditions: |
+ preconditions = Preconditions() |
+ |
+ apitools_request = apitools_messages.StorageObjectsInsertRequest( |
+ bucket=object_metadata.bucket, object=object_metadata, |
+ ifGenerationMatch=preconditions.gen_match, |
+ ifMetagenerationMatch=preconditions.meta_gen_match, |
+ predefinedAcl=predefined_acl) |
+ global_params = apitools_messages.StandardQueryParameters() |
+ if fields: |
+ global_params.fields = ','.join(set(fields)) |
+ |
+ if apitools_strategy == apitools_transfer.SIMPLE_UPLOAD: |
+ # One-shot upload. |
+ apitools_upload = apitools_transfer.Upload( |
+ upload_stream, content_type, total_size=size, auto_transfer=True, |
+ num_retries=self.num_retries) |
+ apitools_upload.strategy = apitools_strategy |
+ apitools_upload.bytes_http = authorized_upload_http |
+ |
+ return self.api_client.objects.Insert( |
+ apitools_request, |
+ upload=apitools_upload, |
+ global_params=global_params) |
+ else: # Resumable upload. |
+ return self._PerformResumableUpload( |
+ upload_stream, authorized_upload_http, content_type, size, |
+ serialization_data, apitools_strategy, apitools_request, |
+ global_params, bytes_uploaded_container, tracker_callback, |
+ additional_headers, progress_callback) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket, |
+ object_name=object_metadata.name) |
+ |
+ def _PerformResumableUpload( |
+ self, upload_stream, authorized_upload_http, content_type, size, |
+ serialization_data, apitools_strategy, apitools_request, global_params, |
+ bytes_uploaded_container, tracker_callback, addl_headers, |
+ progress_callback): |
+ try: |
+ if serialization_data: |
+ # Resuming an existing upload. |
+ apitools_upload = apitools_transfer.Upload.FromData( |
+ upload_stream, serialization_data, self.api_client.http, |
+ num_retries=self.num_retries) |
+ apitools_upload.chunksize = GetJsonResumableChunkSize() |
+ apitools_upload.bytes_http = authorized_upload_http |
+ else: |
+ # New resumable upload. |
+ apitools_upload = apitools_transfer.Upload( |
+ upload_stream, content_type, total_size=size, |
+ chunksize=GetJsonResumableChunkSize(), auto_transfer=False, |
+ num_retries=self.num_retries) |
+ apitools_upload.strategy = apitools_strategy |
+ apitools_upload.bytes_http = authorized_upload_http |
+ self.api_client.objects.Insert( |
+ apitools_request, |
+ upload=apitools_upload, |
+ global_params=global_params) |
+ # Disable retries in apitools. We will handle them explicitly here. |
+ apitools_upload.retry_func = ( |
+ apitools_http_wrapper.RethrowExceptionHandler) |
+ |
+ # Disable apitools' default print callbacks. |
+ def _NoOpCallback(unused_response, unused_upload_object): |
+ pass |
+ |
+ # If we're resuming an upload, apitools has at this point received |
+ # from the server how many bytes it already has. Update our |
+ # callback class with this information. |
+ bytes_uploaded_container.bytes_transferred = apitools_upload.progress |
+ if tracker_callback: |
+ tracker_callback(json.dumps(apitools_upload.serialization_data)) |
+ |
+ retries = 0 |
+ last_progress_byte = apitools_upload.progress |
+ while retries <= self.num_retries: |
+ try: |
+ # TODO: On retry, this will seek to the bytes that the server has, |
+ # causing the hash to be recalculated. Make HashingFileUploadWrapper |
+ # save a digest according to json_resumable_chunk_size. |
+ if size: |
+ # If size is known, we can send it all in one request and avoid |
+ # making a round-trip per chunk. |
+ http_response = apitools_upload.StreamMedia( |
+ callback=_NoOpCallback, finish_callback=_NoOpCallback, |
+ additional_headers=addl_headers) |
+ else: |
+ # Otherwise it's a streaming request and we need to ensure that we |
+ # send the bytes in chunks so that we can guarantee that we never |
+ # need to seek backwards more than our buffer (and also that the |
+ # chunks are aligned to 256KB). |
+ http_response = apitools_upload.StreamInChunks( |
+ callback=_NoOpCallback, finish_callback=_NoOpCallback, |
+ additional_headers=addl_headers) |
+ processed_response = self.api_client.objects.ProcessHttpResponse( |
+ self.api_client.objects.GetMethodConfig('Insert'), http_response) |
+ if size is None and progress_callback: |
+ # Make final progress callback; total size should now be known. |
+ # This works around the fact the send function counts header bytes. |
+ # However, this will make the progress appear to go slightly |
+ # backwards at the end. |
+ progress_callback(apitools_upload.total_size, |
+ apitools_upload.total_size) |
+ return processed_response |
+ except HTTP_TRANSFER_EXCEPTIONS, e: |
+ apitools_http_wrapper.RebuildHttpConnections( |
+ apitools_upload.bytes_http) |
+ while retries <= self.num_retries: |
+ try: |
+ # TODO: Simulate the refresh case in tests. Right now, our |
+ # mocks are not complex enough to simulate a failure. |
+ apitools_upload.RefreshResumableUploadState() |
+ start_byte = apitools_upload.progress |
+ bytes_uploaded_container.bytes_transferred = start_byte |
+ break |
+ except HTTP_TRANSFER_EXCEPTIONS, e2: |
+ apitools_http_wrapper.RebuildHttpConnections( |
+ apitools_upload.bytes_http) |
+ retries += 1 |
+ if retries > self.num_retries: |
+ raise ResumableUploadException( |
+ 'Transfer failed after %d retries. Final exception: %s' % |
+ (self.num_retries, e2)) |
+ time.sleep( |
+ CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay())) |
+ if start_byte > last_progress_byte: |
+ # We've made progress, so allow a fresh set of retries. |
+ last_progress_byte = start_byte |
+ retries = 0 |
+ else: |
+ retries += 1 |
+ if retries > self.num_retries: |
+ raise ResumableUploadException( |
+ 'Transfer failed after %d retries. Final exception: %s' % |
+ (self.num_retries, unicode(e).encode(UTF8))) |
+ time.sleep( |
+ CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay())) |
+ if self.logger.isEnabledFor(logging.DEBUG): |
+ self.logger.debug( |
+ 'Retrying upload from byte %s after exception: %s. Trace: %s', |
+ start_byte, unicode(e).encode(UTF8), traceback.format_exc()) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ resumable_ex = self._TranslateApitoolsResumableUploadException(e) |
+ if resumable_ex: |
+ raise resumable_ex |
+ else: |
+ raise |
+ |
+ def UploadObject(self, upload_stream, object_metadata, canned_acl=None, |
+ size=None, preconditions=None, progress_callback=None, |
+ provider=None, fields=None): |
+ """See CloudApi class for function doc strings.""" |
+ return self._UploadObject( |
+ upload_stream, object_metadata, canned_acl=canned_acl, |
+ size=size, preconditions=preconditions, |
+ progress_callback=progress_callback, fields=fields, |
+ apitools_strategy=apitools_transfer.SIMPLE_UPLOAD) |
+ |
+ def UploadObjectStreaming(self, upload_stream, object_metadata, |
+ canned_acl=None, preconditions=None, |
+ progress_callback=None, provider=None, |
+ fields=None): |
+ """See CloudApi class for function doc strings.""" |
+ # Streaming indicated by not passing a size. |
+ # Resumable capabilities are present up to the resumable chunk size using |
+ # a buffered stream. |
+ return self._UploadObject( |
+ upload_stream, object_metadata, canned_acl=canned_acl, |
+ preconditions=preconditions, progress_callback=progress_callback, |
+ fields=fields, apitools_strategy=apitools_transfer.RESUMABLE_UPLOAD, |
+ total_size=None) |
+ |
+ def UploadObjectResumable( |
+ self, upload_stream, object_metadata, canned_acl=None, preconditions=None, |
+ provider=None, fields=None, size=None, serialization_data=None, |
+ tracker_callback=None, progress_callback=None): |
+ """See CloudApi class for function doc strings.""" |
+ return self._UploadObject( |
+ upload_stream, object_metadata, canned_acl=canned_acl, |
+ preconditions=preconditions, fields=fields, size=size, |
+ serialization_data=serialization_data, |
+ tracker_callback=tracker_callback, progress_callback=progress_callback, |
+ apitools_strategy=apitools_transfer.RESUMABLE_UPLOAD) |
+ |
+ def CopyObject(self, src_obj_metadata, dst_obj_metadata, src_generation=None, |
+ canned_acl=None, preconditions=None, progress_callback=None, |
+ max_bytes_per_call=None, provider=None, fields=None): |
+ """See CloudApi class for function doc strings.""" |
+ ValidateDstObjectMetadata(dst_obj_metadata) |
+ predefined_acl = None |
+ if canned_acl: |
+ predefined_acl = ( |
+ apitools_messages.StorageObjectsRewriteRequest. |
+ DestinationPredefinedAclValueValuesEnum( |
+ self._ObjectCannedAclToPredefinedAcl(canned_acl))) |
+ |
+ if src_generation: |
+ src_generation = long(src_generation) |
+ |
+ if not preconditions: |
+ preconditions = Preconditions() |
+ |
+ projection = (apitools_messages.StorageObjectsRewriteRequest. |
+ ProjectionValueValuesEnum.full) |
+ global_params = apitools_messages.StandardQueryParameters() |
+ if fields: |
+ # Rewrite returns the resultant object under the 'resource' field. |
+ new_fields = set(['done', 'objectSize', 'rewriteToken', |
+ 'totalBytesRewritten']) |
+ for field in fields: |
+ new_fields.add('resource/' + field) |
+ global_params.fields = ','.join(set(new_fields)) |
+ |
+ # Check to see if we are resuming a rewrite. |
+ tracker_file_name = GetRewriteTrackerFilePath( |
+ src_obj_metadata.bucket, src_obj_metadata.name, dst_obj_metadata.bucket, |
+ dst_obj_metadata.name, 'JSON') |
+ rewrite_params_hash = HashRewriteParameters( |
+ src_obj_metadata, dst_obj_metadata, projection, |
+ src_generation=src_generation, gen_match=preconditions.gen_match, |
+ meta_gen_match=preconditions.meta_gen_match, |
+ canned_acl=predefined_acl, fields=global_params.fields, |
+ max_bytes_per_call=max_bytes_per_call) |
+ resume_rewrite_token = ReadRewriteTrackerFile(tracker_file_name, |
+ rewrite_params_hash) |
+ |
+ progress_cb_with_backoff = None |
+ try: |
+ last_bytes_written = 0L |
+ while True: |
+ apitools_request = apitools_messages.StorageObjectsRewriteRequest( |
+ sourceBucket=src_obj_metadata.bucket, |
+ sourceObject=src_obj_metadata.name, |
+ destinationBucket=dst_obj_metadata.bucket, |
+ destinationObject=dst_obj_metadata.name, |
+ projection=projection, object=dst_obj_metadata, |
+ sourceGeneration=src_generation, |
+ ifGenerationMatch=preconditions.gen_match, |
+ ifMetagenerationMatch=preconditions.meta_gen_match, |
+ destinationPredefinedAcl=predefined_acl, |
+ rewriteToken=resume_rewrite_token, |
+ maxBytesRewrittenPerCall=max_bytes_per_call) |
+ rewrite_response = self.api_client.objects.Rewrite( |
+ apitools_request, global_params=global_params) |
+ bytes_written = long(rewrite_response.totalBytesRewritten) |
+ if progress_callback and not progress_cb_with_backoff: |
+ progress_cb_with_backoff = ProgressCallbackWithBackoff( |
+ long(rewrite_response.objectSize), progress_callback) |
+ if progress_cb_with_backoff: |
+ progress_cb_with_backoff.Progress( |
+ bytes_written - last_bytes_written) |
+ |
+ if rewrite_response.done: |
+ break |
+ elif not resume_rewrite_token: |
+ # Save the token and make a tracker file if they don't already exist. |
+ resume_rewrite_token = rewrite_response.rewriteToken |
+ WriteRewriteTrackerFile(tracker_file_name, rewrite_params_hash, |
+ rewrite_response.rewriteToken) |
+ last_bytes_written = bytes_written |
+ |
+ DeleteTrackerFile(tracker_file_name) |
+ return rewrite_response.resource |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket, |
+ object_name=dst_obj_metadata.name) |
+ |
+ def DeleteObject(self, bucket_name, object_name, preconditions=None, |
+ generation=None, provider=None): |
+ """See CloudApi class for function doc strings.""" |
+ if not preconditions: |
+ preconditions = Preconditions() |
+ |
+ if generation: |
+ generation = long(generation) |
+ |
+ apitools_request = apitools_messages.StorageObjectsDeleteRequest( |
+ bucket=bucket_name, object=object_name, generation=generation, |
+ ifGenerationMatch=preconditions.gen_match, |
+ ifMetagenerationMatch=preconditions.meta_gen_match) |
+ try: |
+ return self.api_client.objects.Delete(apitools_request) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
+ object_name=object_name, |
+ generation=generation) |
+ |
+ def ComposeObject(self, src_objs_metadata, dst_obj_metadata, |
+ preconditions=None, provider=None, fields=None): |
+ """See CloudApi class for function doc strings.""" |
+ ValidateDstObjectMetadata(dst_obj_metadata) |
+ |
+ dst_obj_name = dst_obj_metadata.name |
+ dst_obj_metadata.name = None |
+ dst_bucket_name = dst_obj_metadata.bucket |
+ dst_obj_metadata.bucket = None |
+ if not dst_obj_metadata.contentType: |
+ dst_obj_metadata.contentType = DEFAULT_CONTENT_TYPE |
+ |
+ if not preconditions: |
+ preconditions = Preconditions() |
+ |
+ global_params = apitools_messages.StandardQueryParameters() |
+ if fields: |
+ global_params.fields = ','.join(set(fields)) |
+ |
+ src_objs_compose_request = apitools_messages.ComposeRequest( |
+ sourceObjects=src_objs_metadata, destination=dst_obj_metadata) |
+ |
+ apitools_request = apitools_messages.StorageObjectsComposeRequest( |
+ composeRequest=src_objs_compose_request, |
+ destinationBucket=dst_bucket_name, |
+ destinationObject=dst_obj_name, |
+ ifGenerationMatch=preconditions.gen_match, |
+ ifMetagenerationMatch=preconditions.meta_gen_match) |
+ try: |
+ return self.api_client.objects.Compose(apitools_request, |
+ global_params=global_params) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ # We can't be sure which object was missing in the 404 case. |
+ if isinstance(e, apitools_exceptions.HttpError) and e.status_code == 404: |
+ raise NotFoundException('One of the source objects does not exist.') |
+ else: |
+ self._TranslateExceptionAndRaise(e) |
+ |
+ def WatchBucket(self, bucket_name, address, channel_id, token=None, |
+ provider=None, fields=None): |
+ """See CloudApi class for function doc strings.""" |
+ projection = (apitools_messages.StorageObjectsWatchAllRequest |
+ .ProjectionValueValuesEnum.full) |
+ |
+ channel = apitools_messages.Channel(address=address, id=channel_id, |
+ token=token, type='WEB_HOOK') |
+ |
+ apitools_request = apitools_messages.StorageObjectsWatchAllRequest( |
+ bucket=bucket_name, channel=channel, projection=projection) |
+ |
+ global_params = apitools_messages.StandardQueryParameters() |
+ if fields: |
+ global_params.fields = ','.join(set(fields)) |
+ |
+ try: |
+ return self.api_client.objects.WatchAll(apitools_request, |
+ global_params=global_params) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
+ |
+ def StopChannel(self, channel_id, resource_id, provider=None): |
+ """See CloudApi class for function doc strings.""" |
+ channel = apitools_messages.Channel(id=channel_id, resourceId=resource_id) |
+ try: |
+ self.api_client.channels.Stop(channel) |
+ except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
+ self._TranslateExceptionAndRaise(e) |
+ |
+ def _BucketCannedAclToPredefinedAcl(self, canned_acl_string): |
+ """Translates the input string to a bucket PredefinedAcl string. |
+ |
+ Args: |
+ canned_acl_string: Canned ACL string. |
+ |
+ Returns: |
+ String that can be used as a query parameter with the JSON API. This |
+ corresponds to a flavor of *PredefinedAclValueValuesEnum and can be |
+ used as input to apitools requests that affect bucket access controls. |
+ """ |
+ # XML : JSON |
+ translation_dict = { |
+ None: None, |
+ 'authenticated-read': 'authenticatedRead', |
+ 'private': 'private', |
+ 'project-private': 'projectPrivate', |
+ 'public-read': 'publicRead', |
+ 'public-read-write': 'publicReadWrite' |
+ } |
+ if canned_acl_string in translation_dict: |
+ return translation_dict[canned_acl_string] |
+ raise ArgumentException('Invalid canned ACL %s' % canned_acl_string) |
+ |
+ def _ObjectCannedAclToPredefinedAcl(self, canned_acl_string): |
+ """Translates the input string to an object PredefinedAcl string. |
+ |
+ Args: |
+ canned_acl_string: Canned ACL string. |
+ |
+ Returns: |
+ String that can be used as a query parameter with the JSON API. This |
+ corresponds to a flavor of *PredefinedAclValueValuesEnum and can be |
+ used as input to apitools requests that affect object access controls. |
+ """ |
+ # XML : JSON |
+ translation_dict = { |
+ None: None, |
+ 'authenticated-read': 'authenticatedRead', |
+ 'bucket-owner-read': 'bucketOwnerRead', |
+ 'bucket-owner-full-control': 'bucketOwnerFullControl', |
+ 'private': 'private', |
+ 'project-private': 'projectPrivate', |
+ 'public-read': 'publicRead' |
+ } |
+ if canned_acl_string in translation_dict: |
+ return translation_dict[canned_acl_string] |
+ raise ArgumentException('Invalid canned ACL %s' % canned_acl_string) |
+ |
+ def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None, |
+ generation=None): |
+ """Translates an HTTP exception and raises the translated or original value. |
+ |
+ Args: |
+ e: Any Exception. |
+ bucket_name: Optional bucket name in request that caused the exception. |
+ object_name: Optional object name in request that caused the exception. |
+ generation: Optional generation in request that caused the exception. |
+ |
+ Raises: |
+ Translated CloudApi exception, or the original exception if it was not |
+ translatable. |
+ """ |
+ translated_exception = self._TranslateApitoolsException( |
+ e, bucket_name=bucket_name, object_name=object_name, |
+ generation=generation) |
+ if translated_exception: |
+ raise translated_exception |
+ else: |
+ raise |
+ |
+ def _GetMessageFromHttpError(self, http_error): |
+ if isinstance(http_error, apitools_exceptions.HttpError): |
+ if getattr(http_error, 'content', None): |
+ try: |
+ json_obj = json.loads(http_error.content) |
+ if 'error' in json_obj and 'message' in json_obj['error']: |
+ return json_obj['error']['message'] |
+ except Exception: # pylint: disable=broad-except |
+ # If we couldn't decode anything, just leave the message as None. |
+ pass |
+ |
+ def _TranslateApitoolsResumableUploadException( |
+ self, e, bucket_name=None, object_name=None, generation=None): |
+ if isinstance(e, apitools_exceptions.HttpError): |
+ message = self._GetMessageFromHttpError(e) |
+ if (e.status_code == 503 and |
+ self.http.disable_ssl_certificate_validation): |
+ return ServiceException(_VALIDATE_CERTIFICATES_503_MESSAGE, |
+ status=e.status_code) |
+ elif e.status_code >= 500: |
+ return ResumableUploadException( |
+ message or 'Server Error', status=e.status_code) |
+ elif e.status_code == 429: |
+ return ResumableUploadException( |
+ message or 'Too Many Requests', status=e.status_code) |
+ elif e.status_code == 410: |
+ return ResumableUploadStartOverException( |
+ message or 'Bad Request', status=e.status_code) |
+ elif e.status_code == 404: |
+ return ResumableUploadStartOverException( |
+ message or 'Bad Request', status=e.status_code) |
+ elif e.status_code >= 400: |
+ return ResumableUploadAbortException( |
+ message or 'Bad Request', status=e.status_code) |
+ if isinstance(e, apitools_exceptions.StreamExhausted): |
+ return ResumableUploadAbortException(e.message) |
+ if (isinstance(e, apitools_exceptions.TransferError) and |
+ ('Aborting transfer' in e.message or |
+ 'Not enough bytes in stream' in e.message or |
+ 'additional bytes left in stream' in e.message)): |
+ return ResumableUploadAbortException(e.message) |
+ |
+ def _TranslateApitoolsException(self, e, bucket_name=None, object_name=None, |
+ generation=None): |
+ """Translates apitools exceptions into their gsutil Cloud Api equivalents. |
+ |
+ Args: |
+ e: Any exception in TRANSLATABLE_APITOOLS_EXCEPTIONS. |
+ bucket_name: Optional bucket name in request that caused the exception. |
+ object_name: Optional object name in request that caused the exception. |
+ generation: Optional generation in request that caused the exception. |
+ |
+ Returns: |
+ CloudStorageApiServiceException for translatable exceptions, None |
+ otherwise. |
+ """ |
+ if isinstance(e, apitools_exceptions.HttpError): |
+ message = self._GetMessageFromHttpError(e) |
+ if e.status_code == 400: |
+ # It is possible that the Project ID is incorrect. Unfortunately the |
+ # JSON API does not give us much information about what part of the |
+ # request was bad. |
+ return BadRequestException(message or 'Bad Request', |
+ status=e.status_code) |
+ elif e.status_code == 401: |
+ if 'Login Required' in str(e): |
+ return AccessDeniedException( |
+ message or 'Access denied: login required.', |
+ status=e.status_code) |
+ elif e.status_code == 403: |
+ if 'The account for the specified project has been disabled' in str(e): |
+ return AccessDeniedException(message or 'Account disabled.', |
+ status=e.status_code) |
+ elif 'Daily Limit for Unauthenticated Use Exceeded' in str(e): |
+ return AccessDeniedException( |
+ message or 'Access denied: quota exceeded. ' |
+ 'Is your project ID valid?', |
+ status=e.status_code) |
+ elif 'The bucket you tried to delete was not empty.' in str(e): |
+ return NotEmptyException('BucketNotEmpty (%s)' % bucket_name, |
+ status=e.status_code) |
+ elif ('The bucket you tried to create requires domain ownership ' |
+ 'verification.' in str(e)): |
+ return AccessDeniedException( |
+ 'The bucket you tried to create requires domain ownership ' |
+ 'verification. Please see ' |
+ 'https://developers.google.com/storage/docs/bucketnaming' |
+ '?hl=en#verification for more details.', status=e.status_code) |
+ elif 'User Rate Limit Exceeded' in str(e): |
+ return AccessDeniedException('Rate limit exceeded. Please retry this ' |
+ 'request later.', status=e.status_code) |
+ elif 'Access Not Configured' in str(e): |
+ return AccessDeniedException( |
+ 'Access Not Configured. Please go to the Google Developers ' |
+ 'Console (https://cloud.google.com/console#/project) for your ' |
+ 'project, select APIs and Auth and enable the ' |
+ 'Google Cloud Storage JSON API.', |
+ status=e.status_code) |
+ else: |
+ return AccessDeniedException(message or e.message, |
+ status=e.status_code) |
+ elif e.status_code == 404: |
+ if bucket_name: |
+ if object_name: |
+ return CreateObjectNotFoundException(e.status_code, self.provider, |
+ bucket_name, object_name, |
+ generation=generation) |
+ return CreateBucketNotFoundException(e.status_code, self.provider, |
+ bucket_name) |
+ return NotFoundException(e.message, status=e.status_code) |
+ elif e.status_code == 409 and bucket_name: |
+ if 'The bucket you tried to delete was not empty.' in str(e): |
+ return NotEmptyException('BucketNotEmpty (%s)' % bucket_name, |
+ status=e.status_code) |
+ return ServiceException( |
+ 'Bucket %s already exists.' % bucket_name, status=e.status_code) |
+ elif e.status_code == 412: |
+ return PreconditionException(message, status=e.status_code) |
+ elif (e.status_code == 503 and |
+ not self.http.disable_ssl_certificate_validation): |
+ return ServiceException(_VALIDATE_CERTIFICATES_503_MESSAGE, |
+ status=e.status_code) |
+ return ServiceException(message, status=e.status_code) |
+ elif isinstance(e, apitools_exceptions.TransferInvalidError): |
+ return ServiceException('Transfer invalid (possible encoding error: %s)' |
+ % str(e)) |