OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright 2014 Google Inc. All Rights Reserved. |
| 3 # |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 # you may not use this file except in compliance with the License. |
| 6 # You may obtain a copy of the License at |
| 7 # |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 # |
| 10 # Unless required by applicable law or agreed to in writing, software |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 # See the License for the specific language governing permissions and |
| 14 # limitations under the License. |
| 15 """JSON gsutil Cloud API implementation for Google Cloud Storage.""" |
| 16 |
| 17 from __future__ import absolute_import |
| 18 |
| 19 import httplib |
| 20 import json |
| 21 import logging |
| 22 import os |
| 23 import socket |
| 24 import ssl |
| 25 import time |
| 26 import traceback |
| 27 |
| 28 from apitools.base.py import credentials_lib |
| 29 from apitools.base.py import encoding |
| 30 from apitools.base.py import exceptions as apitools_exceptions |
| 31 from apitools.base.py import http_wrapper as apitools_http_wrapper |
| 32 from apitools.base.py import transfer as apitools_transfer |
| 33 from apitools.base.py.util import CalculateWaitForRetry |
| 34 |
| 35 import boto |
| 36 from boto import config |
| 37 from gcs_oauth2_boto_plugin import oauth2_helper |
| 38 import httplib2 |
| 39 from oauth2client import devshell |
| 40 from oauth2client import multistore_file |
| 41 |
| 42 from gslib.cloud_api import AccessDeniedException |
| 43 from gslib.cloud_api import ArgumentException |
| 44 from gslib.cloud_api import BadRequestException |
| 45 from gslib.cloud_api import CloudApi |
| 46 from gslib.cloud_api import NotEmptyException |
| 47 from gslib.cloud_api import NotFoundException |
| 48 from gslib.cloud_api import PreconditionException |
| 49 from gslib.cloud_api import Preconditions |
| 50 from gslib.cloud_api import ResumableDownloadException |
| 51 from gslib.cloud_api import ResumableUploadAbortException |
| 52 from gslib.cloud_api import ResumableUploadException |
| 53 from gslib.cloud_api import ResumableUploadStartOverException |
| 54 from gslib.cloud_api import ServiceException |
| 55 from gslib.cloud_api_helper import ValidateDstObjectMetadata |
| 56 from gslib.cred_types import CredTypes |
| 57 from gslib.exception import CommandException |
| 58 from gslib.gcs_json_media import BytesTransferredContainer |
| 59 from gslib.gcs_json_media import DownloadCallbackConnectionClassFactory |
| 60 from gslib.gcs_json_media import HttpWithDownloadStream |
| 61 from gslib.gcs_json_media import HttpWithNoRetries |
| 62 from gslib.gcs_json_media import UploadCallbackConnectionClassFactory |
| 63 from gslib.gcs_json_media import WrapDownloadHttpRequest |
| 64 from gslib.gcs_json_media import WrapUploadHttpRequest |
| 65 from gslib.no_op_credentials import NoOpCredentials |
| 66 from gslib.progress_callback import ProgressCallbackWithBackoff |
| 67 from gslib.project_id import PopulateProjectId |
| 68 from gslib.third_party.storage_apitools import storage_v1_client as apitools_cli
ent |
| 69 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages |
| 70 from gslib.tracker_file import DeleteTrackerFile |
| 71 from gslib.tracker_file import GetRewriteTrackerFilePath |
| 72 from gslib.tracker_file import HashRewriteParameters |
| 73 from gslib.tracker_file import ReadRewriteTrackerFile |
| 74 from gslib.tracker_file import WriteRewriteTrackerFile |
| 75 from gslib.translation_helper import CreateBucketNotFoundException |
| 76 from gslib.translation_helper import CreateObjectNotFoundException |
| 77 from gslib.translation_helper import DEFAULT_CONTENT_TYPE |
| 78 from gslib.translation_helper import REMOVE_CORS_CONFIG |
| 79 from gslib.util import GetBotoConfigFileList |
| 80 from gslib.util import GetCertsFile |
| 81 from gslib.util import GetCredentialStoreFilename |
| 82 from gslib.util import GetGceCredentialCacheFilename |
| 83 from gslib.util import GetJsonResumableChunkSize |
| 84 from gslib.util import GetMaxRetryDelay |
| 85 from gslib.util import GetNewHttp |
| 86 from gslib.util import GetNumRetries |
| 87 from gslib.util import UTF8 |
| 88 |
| 89 |
| 90 # Implementation supports only 'gs' URLs, so provider is unused. |
| 91 # pylint: disable=unused-argument |
| 92 |
| 93 DEFAULT_GCS_JSON_VERSION = 'v1' |
| 94 |
| 95 NUM_BUCKETS_PER_LIST_PAGE = 1000 |
| 96 NUM_OBJECTS_PER_LIST_PAGE = 1000 |
| 97 |
| 98 TRANSLATABLE_APITOOLS_EXCEPTIONS = (apitools_exceptions.HttpError, |
| 99 apitools_exceptions.StreamExhausted, |
| 100 apitools_exceptions.TransferError, |
| 101 apitools_exceptions.TransferInvalidError) |
| 102 |
| 103 # TODO: Distribute these exceptions better through apitools and here. |
| 104 # Right now, apitools is configured not to handle any exceptions on |
| 105 # uploads/downloads. |
| 106 # oauth2_client tries to JSON-decode the response, which can result |
| 107 # in a ValueError if the response was invalid. Until that is fixed in |
| 108 # oauth2_client, need to handle it here. |
| 109 HTTP_TRANSFER_EXCEPTIONS = (apitools_exceptions.TransferRetryError, |
| 110 apitools_exceptions.BadStatusCodeError, |
| 111 # TODO: Honor retry-after headers. |
| 112 apitools_exceptions.RetryAfterError, |
| 113 apitools_exceptions.RequestError, |
| 114 httplib.BadStatusLine, |
| 115 httplib.IncompleteRead, |
| 116 httplib.ResponseNotReady, |
| 117 httplib2.ServerNotFoundError, |
| 118 socket.error, |
| 119 socket.gaierror, |
| 120 socket.timeout, |
| 121 ssl.SSLError, |
| 122 ValueError) |
| 123 |
| 124 _VALIDATE_CERTIFICATES_503_MESSAGE = ( |
| 125 """Service Unavailable. If you have recently changed |
| 126 https_validate_certificates from True to False in your boto configuration |
| 127 file, please delete any cached access tokens in your filesystem (at %s) |
| 128 and try again.""" % GetCredentialStoreFilename()) |
| 129 |
| 130 |
| 131 class GcsJsonApi(CloudApi): |
| 132 """Google Cloud Storage JSON implementation of gsutil Cloud API.""" |
| 133 |
| 134 def __init__(self, bucket_storage_uri_class, logger, provider=None, |
| 135 credentials=None, debug=0): |
| 136 """Performs necessary setup for interacting with Google Cloud Storage. |
| 137 |
| 138 Args: |
| 139 bucket_storage_uri_class: Unused. |
| 140 logger: logging.logger for outputting log messages. |
| 141 provider: Unused. This implementation supports only Google Cloud Storage. |
| 142 credentials: Credentials to be used for interacting with Google Cloud |
| 143 Storage. |
| 144 debug: Debug level for the API implementation (0..3). |
| 145 """ |
| 146 # TODO: Plumb host_header for perfdiag / test_perfdiag. |
| 147 # TODO: Add jitter to apitools' http_wrapper retry mechanism. |
| 148 super(GcsJsonApi, self).__init__(bucket_storage_uri_class, logger, |
| 149 provider='gs', debug=debug) |
| 150 no_op_credentials = False |
| 151 if not credentials: |
| 152 loaded_credentials = self._CheckAndGetCredentials(logger) |
| 153 |
| 154 if not loaded_credentials: |
| 155 loaded_credentials = NoOpCredentials() |
| 156 no_op_credentials = True |
| 157 else: |
| 158 if isinstance(credentials, NoOpCredentials): |
| 159 no_op_credentials = True |
| 160 |
| 161 self.credentials = credentials or loaded_credentials |
| 162 |
| 163 self.certs_file = GetCertsFile() |
| 164 |
| 165 self.http = GetNewHttp() |
| 166 self.http_base = 'https://' |
| 167 gs_json_host = config.get('Credentials', 'gs_json_host', None) |
| 168 self.host_base = gs_json_host or 'www.googleapis.com' |
| 169 |
| 170 if not gs_json_host: |
| 171 gs_host = config.get('Credentials', 'gs_host', None) |
| 172 if gs_host: |
| 173 raise ArgumentException( |
| 174 'JSON API is selected but gs_json_host is not configured, ' |
| 175 'while gs_host is configured to %s. Please also configure ' |
| 176 'gs_json_host and gs_json_port to match your desired endpoint.' |
| 177 % gs_host) |
| 178 |
| 179 gs_json_port = config.get('Credentials', 'gs_json_port', None) |
| 180 |
| 181 if not gs_json_port: |
| 182 gs_port = config.get('Credentials', 'gs_port', None) |
| 183 if gs_port: |
| 184 raise ArgumentException( |
| 185 'JSON API is selected but gs_json_port is not configured, ' |
| 186 'while gs_port is configured to %s. Please also configure ' |
| 187 'gs_json_host and gs_json_port to match your desired endpoint.' |
| 188 % gs_port) |
| 189 self.host_port = '' |
| 190 else: |
| 191 self.host_port = ':' + config.get('Credentials', 'gs_json_port') |
| 192 |
| 193 self.api_version = config.get('GSUtil', 'json_api_version', |
| 194 DEFAULT_GCS_JSON_VERSION) |
| 195 self.url_base = (self.http_base + self.host_base + self.host_port + '/' + |
| 196 'storage/' + self.api_version + '/') |
| 197 |
| 198 self.credentials.set_store( |
| 199 multistore_file.get_credential_storage_custom_string_key( |
| 200 GetCredentialStoreFilename(), self.api_version)) |
| 201 |
| 202 self.num_retries = GetNumRetries() |
| 203 |
| 204 log_request = (debug >= 3) |
| 205 log_response = (debug >= 3) |
| 206 |
| 207 self.api_client = apitools_client.StorageV1( |
| 208 url=self.url_base, http=self.http, log_request=log_request, |
| 209 log_response=log_response, credentials=self.credentials, |
| 210 version=self.api_version) |
| 211 self.api_client.num_retries = self.num_retries |
| 212 |
| 213 if no_op_credentials: |
| 214 # This API key is not secret and is used to identify gsutil during |
| 215 # anonymous requests. |
| 216 self.api_client.AddGlobalParam('key', |
| 217 u'AIzaSyDnacJHrKma0048b13sh8cgxNUwulubmJM') |
| 218 |
| 219 def _CheckAndGetCredentials(self, logger): |
| 220 configured_cred_types = [] |
| 221 try: |
| 222 if self._HasOauth2UserAccountCreds(): |
| 223 configured_cred_types.append(CredTypes.OAUTH2_USER_ACCOUNT) |
| 224 if self._HasOauth2ServiceAccountCreds(): |
| 225 configured_cred_types.append(CredTypes.OAUTH2_SERVICE_ACCOUNT) |
| 226 if len(configured_cred_types) > 1: |
| 227 # We only allow one set of configured credentials. Otherwise, we're |
| 228 # choosing one arbitrarily, which can be very confusing to the user |
| 229 # (e.g., if only one is authorized to perform some action) and can |
| 230 # also mask errors. |
| 231 # Because boto merges config files, GCE credentials show up by default |
| 232 # for GCE VMs. We don't want to fail when a user creates a boto file |
| 233 # with their own credentials, so in this case we'll use the OAuth2 |
| 234 # user credentials. |
| 235 failed_cred_type = None |
| 236 raise CommandException( |
| 237 ('You have multiple types of configured credentials (%s), which is ' |
| 238 'not supported. One common way this happens is if you run gsutil ' |
| 239 'config to create credentials and later run gcloud auth, and ' |
| 240 'create a second set of credentials. Your boto config path is: ' |
| 241 '%s. For more help, see "gsutil help creds".') |
| 242 % (configured_cred_types, GetBotoConfigFileList())) |
| 243 |
| 244 failed_cred_type = CredTypes.OAUTH2_USER_ACCOUNT |
| 245 user_creds = self._GetOauth2UserAccountCreds() |
| 246 failed_cred_type = CredTypes.OAUTH2_SERVICE_ACCOUNT |
| 247 service_account_creds = self._GetOauth2ServiceAccountCreds() |
| 248 failed_cred_type = CredTypes.GCE |
| 249 gce_creds = self._GetGceCreds() |
| 250 failed_cred_type = CredTypes.DEVSHELL |
| 251 devshell_creds = self._GetDevshellCreds() |
| 252 return user_creds or service_account_creds or gce_creds or devshell_creds |
| 253 except: # pylint: disable=bare-except |
| 254 |
| 255 # If we didn't actually try to authenticate because there were multiple |
| 256 # types of configured credentials, don't emit this warning. |
| 257 if failed_cred_type: |
| 258 if os.environ.get('CLOUDSDK_WRAPPER') == '1': |
| 259 logger.warn( |
| 260 'Your "%s" credentials are invalid. Please run\n' |
| 261 ' $ gcloud auth login', failed_cred_type) |
| 262 else: |
| 263 logger.warn( |
| 264 'Your "%s" credentials are invalid. For more help, see ' |
| 265 '"gsutil help creds", or re-run the gsutil config command (see ' |
| 266 '"gsutil help config").', failed_cred_type) |
| 267 |
| 268 # If there's any set of configured credentials, we'll fail if they're |
| 269 # invalid, rather than silently falling back to anonymous config (as |
| 270 # boto does). That approach leads to much confusion if users don't |
| 271 # realize their credentials are invalid. |
| 272 raise |
| 273 |
| 274 def _HasOauth2ServiceAccountCreds(self): |
| 275 return config.has_option('Credentials', 'gs_service_key_file') |
| 276 |
| 277 def _HasOauth2UserAccountCreds(self): |
| 278 return config.has_option('Credentials', 'gs_oauth2_refresh_token') |
| 279 |
| 280 def _HasGceCreds(self): |
| 281 return config.has_option('GoogleCompute', 'service_account') |
| 282 |
| 283 def _GetOauth2ServiceAccountCreds(self): |
| 284 if self._HasOauth2ServiceAccountCreds(): |
| 285 return oauth2_helper.OAuth2ClientFromBotoConfig( |
| 286 boto.config, |
| 287 cred_type=CredTypes.OAUTH2_SERVICE_ACCOUNT).GetCredentials() |
| 288 |
| 289 def _GetOauth2UserAccountCreds(self): |
| 290 if self._HasOauth2UserAccountCreds(): |
| 291 return oauth2_helper.OAuth2ClientFromBotoConfig( |
| 292 boto.config).GetCredentials() |
| 293 |
| 294 def _GetGceCreds(self): |
| 295 if self._HasGceCreds(): |
| 296 try: |
| 297 return credentials_lib.GceAssertionCredentials( |
| 298 cache_filename=GetGceCredentialCacheFilename()) |
| 299 except apitools_exceptions.ResourceUnavailableError, e: |
| 300 if 'service account' in str(e) and 'does not exist' in str(e): |
| 301 return None |
| 302 raise |
| 303 |
| 304 def _GetDevshellCreds(self): |
| 305 try: |
| 306 return devshell.DevshellCredentials() |
| 307 except devshell.NoDevshellServer: |
| 308 return None |
| 309 except: |
| 310 raise |
| 311 |
| 312 def _GetNewDownloadHttp(self, download_stream): |
| 313 return GetNewHttp(http_class=HttpWithDownloadStream, stream=download_stream) |
| 314 |
| 315 def _GetNewUploadHttp(self): |
| 316 """Returns an upload-safe Http object (by disabling httplib2 retries).""" |
| 317 return GetNewHttp(http_class=HttpWithNoRetries) |
| 318 |
| 319 def GetBucket(self, bucket_name, provider=None, fields=None): |
| 320 """See CloudApi class for function doc strings.""" |
| 321 projection = (apitools_messages.StorageBucketsGetRequest |
| 322 .ProjectionValueValuesEnum.full) |
| 323 apitools_request = apitools_messages.StorageBucketsGetRequest( |
| 324 bucket=bucket_name, projection=projection) |
| 325 global_params = apitools_messages.StandardQueryParameters() |
| 326 if fields: |
| 327 global_params.fields = ','.join(set(fields)) |
| 328 |
| 329 # Here and in list buckets, we have no way of knowing |
| 330 # whether we requested a field and didn't get it because it didn't exist |
| 331 # or because we didn't have permission to access it. |
| 332 try: |
| 333 return self.api_client.buckets.Get(apitools_request, |
| 334 global_params=global_params) |
| 335 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 336 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| 337 |
| 338 def PatchBucket(self, bucket_name, metadata, canned_acl=None, |
| 339 canned_def_acl=None, preconditions=None, provider=None, |
| 340 fields=None): |
| 341 """See CloudApi class for function doc strings.""" |
| 342 projection = (apitools_messages.StorageBucketsPatchRequest |
| 343 .ProjectionValueValuesEnum.full) |
| 344 bucket_metadata = metadata |
| 345 |
| 346 if not preconditions: |
| 347 preconditions = Preconditions() |
| 348 |
| 349 # For blank metadata objects, we need to explicitly call |
| 350 # them out to apitools so it will send/erase them. |
| 351 apitools_include_fields = [] |
| 352 for metadata_field in ('metadata', 'lifecycle', 'logging', 'versioning', |
| 353 'website'): |
| 354 attr = getattr(bucket_metadata, metadata_field, None) |
| 355 if attr and not encoding.MessageToDict(attr): |
| 356 setattr(bucket_metadata, metadata_field, None) |
| 357 apitools_include_fields.append(metadata_field) |
| 358 |
| 359 if bucket_metadata.cors and bucket_metadata.cors == REMOVE_CORS_CONFIG: |
| 360 bucket_metadata.cors = [] |
| 361 apitools_include_fields.append('cors') |
| 362 |
| 363 predefined_acl = None |
| 364 if canned_acl: |
| 365 # Must null out existing ACLs to apply a canned ACL. |
| 366 apitools_include_fields.append('acl') |
| 367 predefined_acl = ( |
| 368 apitools_messages.StorageBucketsPatchRequest. |
| 369 PredefinedAclValueValuesEnum( |
| 370 self._BucketCannedAclToPredefinedAcl(canned_acl))) |
| 371 |
| 372 predefined_def_acl = None |
| 373 if canned_def_acl: |
| 374 # Must null out existing default object ACLs to apply a canned ACL. |
| 375 apitools_include_fields.append('defaultObjectAcl') |
| 376 predefined_def_acl = ( |
| 377 apitools_messages.StorageBucketsPatchRequest. |
| 378 PredefinedDefaultObjectAclValueValuesEnum( |
| 379 self._ObjectCannedAclToPredefinedAcl(canned_def_acl))) |
| 380 |
| 381 apitools_request = apitools_messages.StorageBucketsPatchRequest( |
| 382 bucket=bucket_name, bucketResource=bucket_metadata, |
| 383 projection=projection, |
| 384 ifMetagenerationMatch=preconditions.meta_gen_match, |
| 385 predefinedAcl=predefined_acl, |
| 386 predefinedDefaultObjectAcl=predefined_def_acl) |
| 387 global_params = apitools_messages.StandardQueryParameters() |
| 388 if fields: |
| 389 global_params.fields = ','.join(set(fields)) |
| 390 with self.api_client.IncludeFields(apitools_include_fields): |
| 391 try: |
| 392 return self.api_client.buckets.Patch(apitools_request, |
| 393 global_params=global_params) |
| 394 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 395 self._TranslateExceptionAndRaise(e) |
| 396 |
| 397 def CreateBucket(self, bucket_name, project_id=None, metadata=None, |
| 398 provider=None, fields=None): |
| 399 """See CloudApi class for function doc strings.""" |
| 400 projection = (apitools_messages.StorageBucketsInsertRequest |
| 401 .ProjectionValueValuesEnum.full) |
| 402 if not metadata: |
| 403 metadata = apitools_messages.Bucket() |
| 404 metadata.name = bucket_name |
| 405 |
| 406 if metadata.location: |
| 407 metadata.location = metadata.location.upper() |
| 408 if metadata.storageClass: |
| 409 metadata.storageClass = metadata.storageClass.upper() |
| 410 |
| 411 project_id = PopulateProjectId(project_id) |
| 412 |
| 413 apitools_request = apitools_messages.StorageBucketsInsertRequest( |
| 414 bucket=metadata, project=project_id, projection=projection) |
| 415 global_params = apitools_messages.StandardQueryParameters() |
| 416 if fields: |
| 417 global_params.fields = ','.join(set(fields)) |
| 418 try: |
| 419 return self.api_client.buckets.Insert(apitools_request, |
| 420 global_params=global_params) |
| 421 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 422 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| 423 |
| 424 def DeleteBucket(self, bucket_name, preconditions=None, provider=None): |
| 425 """See CloudApi class for function doc strings.""" |
| 426 if not preconditions: |
| 427 preconditions = Preconditions() |
| 428 |
| 429 apitools_request = apitools_messages.StorageBucketsDeleteRequest( |
| 430 bucket=bucket_name, ifMetagenerationMatch=preconditions.meta_gen_match) |
| 431 |
| 432 try: |
| 433 self.api_client.buckets.Delete(apitools_request) |
| 434 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 435 if isinstance( |
| 436 self._TranslateApitoolsException(e, bucket_name=bucket_name), |
| 437 NotEmptyException): |
| 438 # If bucket is not empty, check to see if versioning is enabled and |
| 439 # signal that in the exception if it is. |
| 440 bucket_metadata = self.GetBucket(bucket_name, |
| 441 fields=['versioning']) |
| 442 if bucket_metadata.versioning and bucket_metadata.versioning.enabled: |
| 443 raise NotEmptyException('VersionedBucketNotEmpty', |
| 444 status=e.status_code) |
| 445 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| 446 |
| 447 def ListBuckets(self, project_id=None, provider=None, fields=None): |
| 448 """See CloudApi class for function doc strings.""" |
| 449 projection = (apitools_messages.StorageBucketsListRequest |
| 450 .ProjectionValueValuesEnum.full) |
| 451 project_id = PopulateProjectId(project_id) |
| 452 |
| 453 apitools_request = apitools_messages.StorageBucketsListRequest( |
| 454 project=project_id, maxResults=NUM_BUCKETS_PER_LIST_PAGE, |
| 455 projection=projection) |
| 456 global_params = apitools_messages.StandardQueryParameters() |
| 457 if fields: |
| 458 if 'nextPageToken' not in fields: |
| 459 fields.add('nextPageToken') |
| 460 global_params.fields = ','.join(set(fields)) |
| 461 try: |
| 462 bucket_list = self.api_client.buckets.List(apitools_request, |
| 463 global_params=global_params) |
| 464 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 465 self._TranslateExceptionAndRaise(e) |
| 466 |
| 467 for bucket in self._YieldBuckets(bucket_list): |
| 468 yield bucket |
| 469 |
| 470 while bucket_list.nextPageToken: |
| 471 apitools_request = apitools_messages.StorageBucketsListRequest( |
| 472 project=project_id, pageToken=bucket_list.nextPageToken, |
| 473 maxResults=NUM_BUCKETS_PER_LIST_PAGE, projection=projection) |
| 474 try: |
| 475 bucket_list = self.api_client.buckets.List(apitools_request, |
| 476 global_params=global_params) |
| 477 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 478 self._TranslateExceptionAndRaise(e) |
| 479 |
| 480 for bucket in self._YieldBuckets(bucket_list): |
| 481 yield bucket |
| 482 |
| 483 def _YieldBuckets(self, bucket_list): |
| 484 """Yields buckets from a list returned by apitools.""" |
| 485 if bucket_list.items: |
| 486 for bucket in bucket_list.items: |
| 487 yield bucket |
| 488 |
| 489 def ListObjects(self, bucket_name, prefix=None, delimiter=None, |
| 490 all_versions=None, provider=None, fields=None): |
| 491 """See CloudApi class for function doc strings.""" |
| 492 projection = (apitools_messages.StorageObjectsListRequest |
| 493 .ProjectionValueValuesEnum.full) |
| 494 apitools_request = apitools_messages.StorageObjectsListRequest( |
| 495 bucket=bucket_name, prefix=prefix, delimiter=delimiter, |
| 496 versions=all_versions, projection=projection, |
| 497 maxResults=NUM_OBJECTS_PER_LIST_PAGE) |
| 498 global_params = apitools_messages.StandardQueryParameters() |
| 499 |
| 500 if fields: |
| 501 fields = set(fields) |
| 502 if 'nextPageToken' not in fields: |
| 503 fields.add('nextPageToken') |
| 504 global_params.fields = ','.join(fields) |
| 505 |
| 506 try: |
| 507 object_list = self.api_client.objects.List(apitools_request, |
| 508 global_params=global_params) |
| 509 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 510 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| 511 |
| 512 for object_or_prefix in self._YieldObjectsAndPrefixes(object_list): |
| 513 yield object_or_prefix |
| 514 |
| 515 while object_list.nextPageToken: |
| 516 apitools_request = apitools_messages.StorageObjectsListRequest( |
| 517 bucket=bucket_name, prefix=prefix, delimiter=delimiter, |
| 518 versions=all_versions, projection=projection, |
| 519 pageToken=object_list.nextPageToken, |
| 520 maxResults=NUM_OBJECTS_PER_LIST_PAGE) |
| 521 try: |
| 522 object_list = self.api_client.objects.List(apitools_request, |
| 523 global_params=global_params) |
| 524 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 525 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| 526 |
| 527 for object_or_prefix in self._YieldObjectsAndPrefixes(object_list): |
| 528 yield object_or_prefix |
| 529 |
| 530 def _YieldObjectsAndPrefixes(self, object_list): |
| 531 if object_list.items: |
| 532 for cloud_obj in object_list.items: |
| 533 yield CloudApi.CsObjectOrPrefix(cloud_obj, |
| 534 CloudApi.CsObjectOrPrefixType.OBJECT) |
| 535 if object_list.prefixes: |
| 536 for prefix in object_list.prefixes: |
| 537 yield CloudApi.CsObjectOrPrefix(prefix, |
| 538 CloudApi.CsObjectOrPrefixType.PREFIX) |
| 539 |
| 540 def GetObjectMetadata(self, bucket_name, object_name, generation=None, |
| 541 provider=None, fields=None): |
| 542 """See CloudApi class for function doc strings.""" |
| 543 projection = (apitools_messages.StorageObjectsGetRequest |
| 544 .ProjectionValueValuesEnum.full) |
| 545 |
| 546 if generation: |
| 547 generation = long(generation) |
| 548 |
| 549 apitools_request = apitools_messages.StorageObjectsGetRequest( |
| 550 bucket=bucket_name, object=object_name, projection=projection, |
| 551 generation=generation) |
| 552 global_params = apitools_messages.StandardQueryParameters() |
| 553 if fields: |
| 554 global_params.fields = ','.join(set(fields)) |
| 555 |
| 556 try: |
| 557 return self.api_client.objects.Get(apitools_request, |
| 558 global_params=global_params) |
| 559 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 560 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| 561 object_name=object_name, |
| 562 generation=generation) |
| 563 |
| 564 def GetObjectMedia( |
| 565 self, bucket_name, object_name, download_stream, |
| 566 provider=None, generation=None, object_size=None, |
| 567 download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, start_byte=0, |
| 568 end_byte=None, progress_callback=None, serialization_data=None, |
| 569 digesters=None): |
| 570 """See CloudApi class for function doc strings.""" |
| 571 # This implementation will get the object metadata first if we don't pass it |
| 572 # in via serialization_data. |
| 573 if generation: |
| 574 generation = long(generation) |
| 575 |
| 576 outer_total_size = object_size |
| 577 if serialization_data: |
| 578 outer_total_size = json.loads(serialization_data)['total_size'] |
| 579 |
| 580 if progress_callback: |
| 581 if outer_total_size is None: |
| 582 raise ArgumentException('Download size is required when callbacks are ' |
| 583 'requested for a download, but no size was ' |
| 584 'provided.') |
| 585 progress_callback(0, outer_total_size) |
| 586 |
| 587 bytes_downloaded_container = BytesTransferredContainer() |
| 588 bytes_downloaded_container.bytes_transferred = start_byte |
| 589 |
| 590 callback_class_factory = DownloadCallbackConnectionClassFactory( |
| 591 bytes_downloaded_container, total_size=outer_total_size, |
| 592 progress_callback=progress_callback, digesters=digesters) |
| 593 download_http_class = callback_class_factory.GetConnectionClass() |
| 594 |
| 595 download_http = self._GetNewDownloadHttp(download_stream) |
| 596 download_http.connections = {'https': download_http_class} |
| 597 authorized_download_http = self.credentials.authorize(download_http) |
| 598 WrapDownloadHttpRequest(authorized_download_http) |
| 599 |
| 600 if serialization_data: |
| 601 apitools_download = apitools_transfer.Download.FromData( |
| 602 download_stream, serialization_data, self.api_client.http, |
| 603 num_retries=self.num_retries) |
| 604 else: |
| 605 apitools_download = apitools_transfer.Download.FromStream( |
| 606 download_stream, auto_transfer=False, total_size=object_size, |
| 607 num_retries=self.num_retries) |
| 608 |
| 609 apitools_download.bytes_http = authorized_download_http |
| 610 apitools_request = apitools_messages.StorageObjectsGetRequest( |
| 611 bucket=bucket_name, object=object_name, generation=generation) |
| 612 |
| 613 try: |
| 614 if download_strategy == CloudApi.DownloadStrategy.RESUMABLE: |
| 615 # Disable retries in apitools. We will handle them explicitly here. |
| 616 apitools_download.retry_func = ( |
| 617 apitools_http_wrapper.RethrowExceptionHandler) |
| 618 return self._PerformResumableDownload( |
| 619 bucket_name, object_name, download_stream, apitools_request, |
| 620 apitools_download, bytes_downloaded_container, |
| 621 generation=generation, start_byte=start_byte, end_byte=end_byte, |
| 622 serialization_data=serialization_data) |
| 623 else: |
| 624 return self._PerformDownload( |
| 625 bucket_name, object_name, download_stream, apitools_request, |
| 626 apitools_download, generation=generation, start_byte=start_byte, |
| 627 end_byte=end_byte, serialization_data=serialization_data) |
| 628 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 629 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| 630 object_name=object_name, |
| 631 generation=generation) |
| 632 |
| 633 def _PerformResumableDownload( |
| 634 self, bucket_name, object_name, download_stream, apitools_request, |
| 635 apitools_download, bytes_downloaded_container, generation=None, |
| 636 start_byte=0, end_byte=None, serialization_data=None): |
| 637 retries = 0 |
| 638 last_progress_byte = start_byte |
| 639 while retries <= self.num_retries: |
| 640 try: |
| 641 return self._PerformDownload( |
| 642 bucket_name, object_name, download_stream, apitools_request, |
| 643 apitools_download, generation=generation, start_byte=start_byte, |
| 644 end_byte=end_byte, serialization_data=serialization_data) |
| 645 except HTTP_TRANSFER_EXCEPTIONS, e: |
| 646 start_byte = download_stream.tell() |
| 647 bytes_downloaded_container.bytes_transferred = start_byte |
| 648 if start_byte > last_progress_byte: |
| 649 # We've made progress, so allow a fresh set of retries. |
| 650 last_progress_byte = start_byte |
| 651 retries = 0 |
| 652 retries += 1 |
| 653 if retries > self.num_retries: |
| 654 raise ResumableDownloadException( |
| 655 'Transfer failed after %d retries. Final exception: %s' % |
| 656 (self.num_retries, unicode(e).encode(UTF8))) |
| 657 time.sleep(CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay())) |
| 658 if self.logger.isEnabledFor(logging.DEBUG): |
| 659 self.logger.debug( |
| 660 'Retrying download from byte %s after exception: %s. Trace: %s', |
| 661 start_byte, unicode(e).encode(UTF8), traceback.format_exc()) |
| 662 apitools_http_wrapper.RebuildHttpConnections( |
| 663 apitools_download.bytes_http) |
| 664 |
| 665 def _PerformDownload( |
| 666 self, bucket_name, object_name, download_stream, apitools_request, |
| 667 apitools_download, generation=None, start_byte=0, end_byte=None, |
| 668 serialization_data=None): |
| 669 if not serialization_data: |
| 670 try: |
| 671 self.api_client.objects.Get(apitools_request, |
| 672 download=apitools_download) |
| 673 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 674 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| 675 object_name=object_name, |
| 676 generation=generation) |
| 677 |
| 678 # Disable apitools' default print callbacks. |
| 679 def _NoOpCallback(unused_response, unused_download_object): |
| 680 pass |
| 681 |
| 682 # TODO: If we have a resumable download with accept-encoding:gzip |
| 683 # on a object that is compressible but not in gzip form in the cloud, |
| 684 # on-the-fly compression will gzip the object. In this case if our |
| 685 # download breaks, future requests will ignore the range header and just |
| 686 # return the object (gzipped) in its entirety. Ideally, we would unzip |
| 687 # the bytes that we have locally and send a range request without |
| 688 # accept-encoding:gzip so that we can download only the (uncompressed) bytes |
| 689 # that we don't yet have. |
| 690 |
| 691 # Since bytes_http is created in this function, we don't get the |
| 692 # user-agent header from api_client's http automatically. |
| 693 additional_headers = { |
| 694 'accept-encoding': 'gzip', |
| 695 'user-agent': self.api_client.user_agent |
| 696 } |
| 697 if start_byte or end_byte: |
| 698 apitools_download.GetRange(additional_headers=additional_headers, |
| 699 start=start_byte, end=end_byte, |
| 700 use_chunks=False) |
| 701 else: |
| 702 apitools_download.StreamMedia( |
| 703 callback=_NoOpCallback, finish_callback=_NoOpCallback, |
| 704 additional_headers=additional_headers, use_chunks=False) |
| 705 return apitools_download.encoding |
| 706 |
| 707 def PatchObjectMetadata(self, bucket_name, object_name, metadata, |
| 708 canned_acl=None, generation=None, preconditions=None, |
| 709 provider=None, fields=None): |
| 710 """See CloudApi class for function doc strings.""" |
| 711 projection = (apitools_messages.StorageObjectsPatchRequest |
| 712 .ProjectionValueValuesEnum.full) |
| 713 |
| 714 if not preconditions: |
| 715 preconditions = Preconditions() |
| 716 |
| 717 if generation: |
| 718 generation = long(generation) |
| 719 |
| 720 predefined_acl = None |
| 721 apitools_include_fields = [] |
| 722 if canned_acl: |
| 723 # Must null out existing ACLs to apply a canned ACL. |
| 724 apitools_include_fields.append('acl') |
| 725 predefined_acl = ( |
| 726 apitools_messages.StorageObjectsPatchRequest. |
| 727 PredefinedAclValueValuesEnum( |
| 728 self._ObjectCannedAclToPredefinedAcl(canned_acl))) |
| 729 |
| 730 apitools_request = apitools_messages.StorageObjectsPatchRequest( |
| 731 bucket=bucket_name, object=object_name, objectResource=metadata, |
| 732 generation=generation, projection=projection, |
| 733 ifGenerationMatch=preconditions.gen_match, |
| 734 ifMetagenerationMatch=preconditions.meta_gen_match, |
| 735 predefinedAcl=predefined_acl) |
| 736 global_params = apitools_messages.StandardQueryParameters() |
| 737 if fields: |
| 738 global_params.fields = ','.join(set(fields)) |
| 739 |
| 740 try: |
| 741 with self.api_client.IncludeFields(apitools_include_fields): |
| 742 return self.api_client.objects.Patch(apitools_request, |
| 743 global_params=global_params) |
| 744 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 745 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| 746 object_name=object_name, |
| 747 generation=generation) |
| 748 |
| 749 def _UploadObject(self, upload_stream, object_metadata, canned_acl=None, |
| 750 size=None, preconditions=None, provider=None, fields=None, |
| 751 serialization_data=None, tracker_callback=None, |
| 752 progress_callback=None, |
| 753 apitools_strategy=apitools_transfer.SIMPLE_UPLOAD, |
| 754 total_size=0): |
| 755 # pylint: disable=g-doc-args |
| 756 """Upload implementation. Cloud API arguments, plus two more. |
| 757 |
| 758 Additional args: |
| 759 apitools_strategy: SIMPLE_UPLOAD or RESUMABLE_UPLOAD. |
| 760 total_size: Total size of the upload; None if it is unknown (streaming). |
| 761 |
| 762 Returns: |
| 763 Uploaded object metadata. |
| 764 """ |
| 765 # pylint: enable=g-doc-args |
| 766 ValidateDstObjectMetadata(object_metadata) |
| 767 predefined_acl = None |
| 768 if canned_acl: |
| 769 predefined_acl = ( |
| 770 apitools_messages.StorageObjectsInsertRequest. |
| 771 PredefinedAclValueValuesEnum( |
| 772 self._ObjectCannedAclToPredefinedAcl(canned_acl))) |
| 773 |
| 774 bytes_uploaded_container = BytesTransferredContainer() |
| 775 |
| 776 if progress_callback and size: |
| 777 total_size = size |
| 778 progress_callback(0, size) |
| 779 |
| 780 callback_class_factory = UploadCallbackConnectionClassFactory( |
| 781 bytes_uploaded_container, total_size=total_size, |
| 782 progress_callback=progress_callback) |
| 783 |
| 784 upload_http = self._GetNewUploadHttp() |
| 785 upload_http_class = callback_class_factory.GetConnectionClass() |
| 786 upload_http.connections = {'http': upload_http_class, |
| 787 'https': upload_http_class} |
| 788 |
| 789 authorized_upload_http = self.credentials.authorize(upload_http) |
| 790 WrapUploadHttpRequest(authorized_upload_http) |
| 791 # Since bytes_http is created in this function, we don't get the |
| 792 # user-agent header from api_client's http automatically. |
| 793 additional_headers = { |
| 794 'user-agent': self.api_client.user_agent |
| 795 } |
| 796 |
| 797 try: |
| 798 content_type = None |
| 799 apitools_request = None |
| 800 global_params = None |
| 801 if not serialization_data: |
| 802 # This is a new upload, set up initial upload state. |
| 803 content_type = object_metadata.contentType |
| 804 if not content_type: |
| 805 content_type = DEFAULT_CONTENT_TYPE |
| 806 |
| 807 if not preconditions: |
| 808 preconditions = Preconditions() |
| 809 |
| 810 apitools_request = apitools_messages.StorageObjectsInsertRequest( |
| 811 bucket=object_metadata.bucket, object=object_metadata, |
| 812 ifGenerationMatch=preconditions.gen_match, |
| 813 ifMetagenerationMatch=preconditions.meta_gen_match, |
| 814 predefinedAcl=predefined_acl) |
| 815 global_params = apitools_messages.StandardQueryParameters() |
| 816 if fields: |
| 817 global_params.fields = ','.join(set(fields)) |
| 818 |
| 819 if apitools_strategy == apitools_transfer.SIMPLE_UPLOAD: |
| 820 # One-shot upload. |
| 821 apitools_upload = apitools_transfer.Upload( |
| 822 upload_stream, content_type, total_size=size, auto_transfer=True, |
| 823 num_retries=self.num_retries) |
| 824 apitools_upload.strategy = apitools_strategy |
| 825 apitools_upload.bytes_http = authorized_upload_http |
| 826 |
| 827 return self.api_client.objects.Insert( |
| 828 apitools_request, |
| 829 upload=apitools_upload, |
| 830 global_params=global_params) |
| 831 else: # Resumable upload. |
| 832 return self._PerformResumableUpload( |
| 833 upload_stream, authorized_upload_http, content_type, size, |
| 834 serialization_data, apitools_strategy, apitools_request, |
| 835 global_params, bytes_uploaded_container, tracker_callback, |
| 836 additional_headers, progress_callback) |
| 837 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 838 self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket, |
| 839 object_name=object_metadata.name) |
| 840 |
| 841 def _PerformResumableUpload( |
| 842 self, upload_stream, authorized_upload_http, content_type, size, |
| 843 serialization_data, apitools_strategy, apitools_request, global_params, |
| 844 bytes_uploaded_container, tracker_callback, addl_headers, |
| 845 progress_callback): |
| 846 try: |
| 847 if serialization_data: |
| 848 # Resuming an existing upload. |
| 849 apitools_upload = apitools_transfer.Upload.FromData( |
| 850 upload_stream, serialization_data, self.api_client.http, |
| 851 num_retries=self.num_retries) |
| 852 apitools_upload.chunksize = GetJsonResumableChunkSize() |
| 853 apitools_upload.bytes_http = authorized_upload_http |
| 854 else: |
| 855 # New resumable upload. |
| 856 apitools_upload = apitools_transfer.Upload( |
| 857 upload_stream, content_type, total_size=size, |
| 858 chunksize=GetJsonResumableChunkSize(), auto_transfer=False, |
| 859 num_retries=self.num_retries) |
| 860 apitools_upload.strategy = apitools_strategy |
| 861 apitools_upload.bytes_http = authorized_upload_http |
| 862 self.api_client.objects.Insert( |
| 863 apitools_request, |
| 864 upload=apitools_upload, |
| 865 global_params=global_params) |
| 866 # Disable retries in apitools. We will handle them explicitly here. |
| 867 apitools_upload.retry_func = ( |
| 868 apitools_http_wrapper.RethrowExceptionHandler) |
| 869 |
| 870 # Disable apitools' default print callbacks. |
| 871 def _NoOpCallback(unused_response, unused_upload_object): |
| 872 pass |
| 873 |
| 874 # If we're resuming an upload, apitools has at this point received |
| 875 # from the server how many bytes it already has. Update our |
| 876 # callback class with this information. |
| 877 bytes_uploaded_container.bytes_transferred = apitools_upload.progress |
| 878 if tracker_callback: |
| 879 tracker_callback(json.dumps(apitools_upload.serialization_data)) |
| 880 |
| 881 retries = 0 |
| 882 last_progress_byte = apitools_upload.progress |
| 883 while retries <= self.num_retries: |
| 884 try: |
| 885 # TODO: On retry, this will seek to the bytes that the server has, |
| 886 # causing the hash to be recalculated. Make HashingFileUploadWrapper |
| 887 # save a digest according to json_resumable_chunk_size. |
| 888 if size: |
| 889 # If size is known, we can send it all in one request and avoid |
| 890 # making a round-trip per chunk. |
| 891 http_response = apitools_upload.StreamMedia( |
| 892 callback=_NoOpCallback, finish_callback=_NoOpCallback, |
| 893 additional_headers=addl_headers) |
| 894 else: |
| 895 # Otherwise it's a streaming request and we need to ensure that we |
| 896 # send the bytes in chunks so that we can guarantee that we never |
| 897 # need to seek backwards more than our buffer (and also that the |
| 898 # chunks are aligned to 256KB). |
| 899 http_response = apitools_upload.StreamInChunks( |
| 900 callback=_NoOpCallback, finish_callback=_NoOpCallback, |
| 901 additional_headers=addl_headers) |
| 902 processed_response = self.api_client.objects.ProcessHttpResponse( |
| 903 self.api_client.objects.GetMethodConfig('Insert'), http_response) |
| 904 if size is None and progress_callback: |
| 905 # Make final progress callback; total size should now be known. |
| 906 # This works around the fact the send function counts header bytes. |
| 907 # However, this will make the progress appear to go slightly |
| 908 # backwards at the end. |
| 909 progress_callback(apitools_upload.total_size, |
| 910 apitools_upload.total_size) |
| 911 return processed_response |
| 912 except HTTP_TRANSFER_EXCEPTIONS, e: |
| 913 apitools_http_wrapper.RebuildHttpConnections( |
| 914 apitools_upload.bytes_http) |
| 915 while retries <= self.num_retries: |
| 916 try: |
| 917 # TODO: Simulate the refresh case in tests. Right now, our |
| 918 # mocks are not complex enough to simulate a failure. |
| 919 apitools_upload.RefreshResumableUploadState() |
| 920 start_byte = apitools_upload.progress |
| 921 bytes_uploaded_container.bytes_transferred = start_byte |
| 922 break |
| 923 except HTTP_TRANSFER_EXCEPTIONS, e2: |
| 924 apitools_http_wrapper.RebuildHttpConnections( |
| 925 apitools_upload.bytes_http) |
| 926 retries += 1 |
| 927 if retries > self.num_retries: |
| 928 raise ResumableUploadException( |
| 929 'Transfer failed after %d retries. Final exception: %s' % |
| 930 (self.num_retries, e2)) |
| 931 time.sleep( |
| 932 CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay())) |
| 933 if start_byte > last_progress_byte: |
| 934 # We've made progress, so allow a fresh set of retries. |
| 935 last_progress_byte = start_byte |
| 936 retries = 0 |
| 937 else: |
| 938 retries += 1 |
| 939 if retries > self.num_retries: |
| 940 raise ResumableUploadException( |
| 941 'Transfer failed after %d retries. Final exception: %s' % |
| 942 (self.num_retries, unicode(e).encode(UTF8))) |
| 943 time.sleep( |
| 944 CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay())) |
| 945 if self.logger.isEnabledFor(logging.DEBUG): |
| 946 self.logger.debug( |
| 947 'Retrying upload from byte %s after exception: %s. Trace: %s', |
| 948 start_byte, unicode(e).encode(UTF8), traceback.format_exc()) |
| 949 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 950 resumable_ex = self._TranslateApitoolsResumableUploadException(e) |
| 951 if resumable_ex: |
| 952 raise resumable_ex |
| 953 else: |
| 954 raise |
| 955 |
| 956 def UploadObject(self, upload_stream, object_metadata, canned_acl=None, |
| 957 size=None, preconditions=None, progress_callback=None, |
| 958 provider=None, fields=None): |
| 959 """See CloudApi class for function doc strings.""" |
| 960 return self._UploadObject( |
| 961 upload_stream, object_metadata, canned_acl=canned_acl, |
| 962 size=size, preconditions=preconditions, |
| 963 progress_callback=progress_callback, fields=fields, |
| 964 apitools_strategy=apitools_transfer.SIMPLE_UPLOAD) |
| 965 |
| 966 def UploadObjectStreaming(self, upload_stream, object_metadata, |
| 967 canned_acl=None, preconditions=None, |
| 968 progress_callback=None, provider=None, |
| 969 fields=None): |
| 970 """See CloudApi class for function doc strings.""" |
| 971 # Streaming indicated by not passing a size. |
| 972 # Resumable capabilities are present up to the resumable chunk size using |
| 973 # a buffered stream. |
| 974 return self._UploadObject( |
| 975 upload_stream, object_metadata, canned_acl=canned_acl, |
| 976 preconditions=preconditions, progress_callback=progress_callback, |
| 977 fields=fields, apitools_strategy=apitools_transfer.RESUMABLE_UPLOAD, |
| 978 total_size=None) |
| 979 |
| 980 def UploadObjectResumable( |
| 981 self, upload_stream, object_metadata, canned_acl=None, preconditions=None, |
| 982 provider=None, fields=None, size=None, serialization_data=None, |
| 983 tracker_callback=None, progress_callback=None): |
| 984 """See CloudApi class for function doc strings.""" |
| 985 return self._UploadObject( |
| 986 upload_stream, object_metadata, canned_acl=canned_acl, |
| 987 preconditions=preconditions, fields=fields, size=size, |
| 988 serialization_data=serialization_data, |
| 989 tracker_callback=tracker_callback, progress_callback=progress_callback, |
| 990 apitools_strategy=apitools_transfer.RESUMABLE_UPLOAD) |
| 991 |
| 992 def CopyObject(self, src_obj_metadata, dst_obj_metadata, src_generation=None, |
| 993 canned_acl=None, preconditions=None, progress_callback=None, |
| 994 max_bytes_per_call=None, provider=None, fields=None): |
| 995 """See CloudApi class for function doc strings.""" |
| 996 ValidateDstObjectMetadata(dst_obj_metadata) |
| 997 predefined_acl = None |
| 998 if canned_acl: |
| 999 predefined_acl = ( |
| 1000 apitools_messages.StorageObjectsRewriteRequest. |
| 1001 DestinationPredefinedAclValueValuesEnum( |
| 1002 self._ObjectCannedAclToPredefinedAcl(canned_acl))) |
| 1003 |
| 1004 if src_generation: |
| 1005 src_generation = long(src_generation) |
| 1006 |
| 1007 if not preconditions: |
| 1008 preconditions = Preconditions() |
| 1009 |
| 1010 projection = (apitools_messages.StorageObjectsRewriteRequest. |
| 1011 ProjectionValueValuesEnum.full) |
| 1012 global_params = apitools_messages.StandardQueryParameters() |
| 1013 if fields: |
| 1014 # Rewrite returns the resultant object under the 'resource' field. |
| 1015 new_fields = set(['done', 'objectSize', 'rewriteToken', |
| 1016 'totalBytesRewritten']) |
| 1017 for field in fields: |
| 1018 new_fields.add('resource/' + field) |
| 1019 global_params.fields = ','.join(set(new_fields)) |
| 1020 |
| 1021 # Check to see if we are resuming a rewrite. |
| 1022 tracker_file_name = GetRewriteTrackerFilePath( |
| 1023 src_obj_metadata.bucket, src_obj_metadata.name, dst_obj_metadata.bucket, |
| 1024 dst_obj_metadata.name, 'JSON') |
| 1025 rewrite_params_hash = HashRewriteParameters( |
| 1026 src_obj_metadata, dst_obj_metadata, projection, |
| 1027 src_generation=src_generation, gen_match=preconditions.gen_match, |
| 1028 meta_gen_match=preconditions.meta_gen_match, |
| 1029 canned_acl=predefined_acl, fields=global_params.fields, |
| 1030 max_bytes_per_call=max_bytes_per_call) |
| 1031 resume_rewrite_token = ReadRewriteTrackerFile(tracker_file_name, |
| 1032 rewrite_params_hash) |
| 1033 |
| 1034 progress_cb_with_backoff = None |
| 1035 try: |
| 1036 last_bytes_written = 0L |
| 1037 while True: |
| 1038 apitools_request = apitools_messages.StorageObjectsRewriteRequest( |
| 1039 sourceBucket=src_obj_metadata.bucket, |
| 1040 sourceObject=src_obj_metadata.name, |
| 1041 destinationBucket=dst_obj_metadata.bucket, |
| 1042 destinationObject=dst_obj_metadata.name, |
| 1043 projection=projection, object=dst_obj_metadata, |
| 1044 sourceGeneration=src_generation, |
| 1045 ifGenerationMatch=preconditions.gen_match, |
| 1046 ifMetagenerationMatch=preconditions.meta_gen_match, |
| 1047 destinationPredefinedAcl=predefined_acl, |
| 1048 rewriteToken=resume_rewrite_token, |
| 1049 maxBytesRewrittenPerCall=max_bytes_per_call) |
| 1050 rewrite_response = self.api_client.objects.Rewrite( |
| 1051 apitools_request, global_params=global_params) |
| 1052 bytes_written = long(rewrite_response.totalBytesRewritten) |
| 1053 if progress_callback and not progress_cb_with_backoff: |
| 1054 progress_cb_with_backoff = ProgressCallbackWithBackoff( |
| 1055 long(rewrite_response.objectSize), progress_callback) |
| 1056 if progress_cb_with_backoff: |
| 1057 progress_cb_with_backoff.Progress( |
| 1058 bytes_written - last_bytes_written) |
| 1059 |
| 1060 if rewrite_response.done: |
| 1061 break |
| 1062 elif not resume_rewrite_token: |
| 1063 # Save the token and make a tracker file if they don't already exist. |
| 1064 resume_rewrite_token = rewrite_response.rewriteToken |
| 1065 WriteRewriteTrackerFile(tracker_file_name, rewrite_params_hash, |
| 1066 rewrite_response.rewriteToken) |
| 1067 last_bytes_written = bytes_written |
| 1068 |
| 1069 DeleteTrackerFile(tracker_file_name) |
| 1070 return rewrite_response.resource |
| 1071 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 1072 self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket, |
| 1073 object_name=dst_obj_metadata.name) |
| 1074 |
| 1075 def DeleteObject(self, bucket_name, object_name, preconditions=None, |
| 1076 generation=None, provider=None): |
| 1077 """See CloudApi class for function doc strings.""" |
| 1078 if not preconditions: |
| 1079 preconditions = Preconditions() |
| 1080 |
| 1081 if generation: |
| 1082 generation = long(generation) |
| 1083 |
| 1084 apitools_request = apitools_messages.StorageObjectsDeleteRequest( |
| 1085 bucket=bucket_name, object=object_name, generation=generation, |
| 1086 ifGenerationMatch=preconditions.gen_match, |
| 1087 ifMetagenerationMatch=preconditions.meta_gen_match) |
| 1088 try: |
| 1089 return self.api_client.objects.Delete(apitools_request) |
| 1090 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 1091 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| 1092 object_name=object_name, |
| 1093 generation=generation) |
| 1094 |
| 1095 def ComposeObject(self, src_objs_metadata, dst_obj_metadata, |
| 1096 preconditions=None, provider=None, fields=None): |
| 1097 """See CloudApi class for function doc strings.""" |
| 1098 ValidateDstObjectMetadata(dst_obj_metadata) |
| 1099 |
| 1100 dst_obj_name = dst_obj_metadata.name |
| 1101 dst_obj_metadata.name = None |
| 1102 dst_bucket_name = dst_obj_metadata.bucket |
| 1103 dst_obj_metadata.bucket = None |
| 1104 if not dst_obj_metadata.contentType: |
| 1105 dst_obj_metadata.contentType = DEFAULT_CONTENT_TYPE |
| 1106 |
| 1107 if not preconditions: |
| 1108 preconditions = Preconditions() |
| 1109 |
| 1110 global_params = apitools_messages.StandardQueryParameters() |
| 1111 if fields: |
| 1112 global_params.fields = ','.join(set(fields)) |
| 1113 |
| 1114 src_objs_compose_request = apitools_messages.ComposeRequest( |
| 1115 sourceObjects=src_objs_metadata, destination=dst_obj_metadata) |
| 1116 |
| 1117 apitools_request = apitools_messages.StorageObjectsComposeRequest( |
| 1118 composeRequest=src_objs_compose_request, |
| 1119 destinationBucket=dst_bucket_name, |
| 1120 destinationObject=dst_obj_name, |
| 1121 ifGenerationMatch=preconditions.gen_match, |
| 1122 ifMetagenerationMatch=preconditions.meta_gen_match) |
| 1123 try: |
| 1124 return self.api_client.objects.Compose(apitools_request, |
| 1125 global_params=global_params) |
| 1126 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 1127 # We can't be sure which object was missing in the 404 case. |
| 1128 if isinstance(e, apitools_exceptions.HttpError) and e.status_code == 404: |
| 1129 raise NotFoundException('One of the source objects does not exist.') |
| 1130 else: |
| 1131 self._TranslateExceptionAndRaise(e) |
| 1132 |
| 1133 def WatchBucket(self, bucket_name, address, channel_id, token=None, |
| 1134 provider=None, fields=None): |
| 1135 """See CloudApi class for function doc strings.""" |
| 1136 projection = (apitools_messages.StorageObjectsWatchAllRequest |
| 1137 .ProjectionValueValuesEnum.full) |
| 1138 |
| 1139 channel = apitools_messages.Channel(address=address, id=channel_id, |
| 1140 token=token, type='WEB_HOOK') |
| 1141 |
| 1142 apitools_request = apitools_messages.StorageObjectsWatchAllRequest( |
| 1143 bucket=bucket_name, channel=channel, projection=projection) |
| 1144 |
| 1145 global_params = apitools_messages.StandardQueryParameters() |
| 1146 if fields: |
| 1147 global_params.fields = ','.join(set(fields)) |
| 1148 |
| 1149 try: |
| 1150 return self.api_client.objects.WatchAll(apitools_request, |
| 1151 global_params=global_params) |
| 1152 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 1153 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| 1154 |
| 1155 def StopChannel(self, channel_id, resource_id, provider=None): |
| 1156 """See CloudApi class for function doc strings.""" |
| 1157 channel = apitools_messages.Channel(id=channel_id, resourceId=resource_id) |
| 1158 try: |
| 1159 self.api_client.channels.Stop(channel) |
| 1160 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| 1161 self._TranslateExceptionAndRaise(e) |
| 1162 |
| 1163 def _BucketCannedAclToPredefinedAcl(self, canned_acl_string): |
| 1164 """Translates the input string to a bucket PredefinedAcl string. |
| 1165 |
| 1166 Args: |
| 1167 canned_acl_string: Canned ACL string. |
| 1168 |
| 1169 Returns: |
| 1170 String that can be used as a query parameter with the JSON API. This |
| 1171 corresponds to a flavor of *PredefinedAclValueValuesEnum and can be |
| 1172 used as input to apitools requests that affect bucket access controls. |
| 1173 """ |
| 1174 # XML : JSON |
| 1175 translation_dict = { |
| 1176 None: None, |
| 1177 'authenticated-read': 'authenticatedRead', |
| 1178 'private': 'private', |
| 1179 'project-private': 'projectPrivate', |
| 1180 'public-read': 'publicRead', |
| 1181 'public-read-write': 'publicReadWrite' |
| 1182 } |
| 1183 if canned_acl_string in translation_dict: |
| 1184 return translation_dict[canned_acl_string] |
| 1185 raise ArgumentException('Invalid canned ACL %s' % canned_acl_string) |
| 1186 |
| 1187 def _ObjectCannedAclToPredefinedAcl(self, canned_acl_string): |
| 1188 """Translates the input string to an object PredefinedAcl string. |
| 1189 |
| 1190 Args: |
| 1191 canned_acl_string: Canned ACL string. |
| 1192 |
| 1193 Returns: |
| 1194 String that can be used as a query parameter with the JSON API. This |
| 1195 corresponds to a flavor of *PredefinedAclValueValuesEnum and can be |
| 1196 used as input to apitools requests that affect object access controls. |
| 1197 """ |
| 1198 # XML : JSON |
| 1199 translation_dict = { |
| 1200 None: None, |
| 1201 'authenticated-read': 'authenticatedRead', |
| 1202 'bucket-owner-read': 'bucketOwnerRead', |
| 1203 'bucket-owner-full-control': 'bucketOwnerFullControl', |
| 1204 'private': 'private', |
| 1205 'project-private': 'projectPrivate', |
| 1206 'public-read': 'publicRead' |
| 1207 } |
| 1208 if canned_acl_string in translation_dict: |
| 1209 return translation_dict[canned_acl_string] |
| 1210 raise ArgumentException('Invalid canned ACL %s' % canned_acl_string) |
| 1211 |
| 1212 def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None, |
| 1213 generation=None): |
| 1214 """Translates an HTTP exception and raises the translated or original value. |
| 1215 |
| 1216 Args: |
| 1217 e: Any Exception. |
| 1218 bucket_name: Optional bucket name in request that caused the exception. |
| 1219 object_name: Optional object name in request that caused the exception. |
| 1220 generation: Optional generation in request that caused the exception. |
| 1221 |
| 1222 Raises: |
| 1223 Translated CloudApi exception, or the original exception if it was not |
| 1224 translatable. |
| 1225 """ |
| 1226 translated_exception = self._TranslateApitoolsException( |
| 1227 e, bucket_name=bucket_name, object_name=object_name, |
| 1228 generation=generation) |
| 1229 if translated_exception: |
| 1230 raise translated_exception |
| 1231 else: |
| 1232 raise |
| 1233 |
| 1234 def _GetMessageFromHttpError(self, http_error): |
| 1235 if isinstance(http_error, apitools_exceptions.HttpError): |
| 1236 if getattr(http_error, 'content', None): |
| 1237 try: |
| 1238 json_obj = json.loads(http_error.content) |
| 1239 if 'error' in json_obj and 'message' in json_obj['error']: |
| 1240 return json_obj['error']['message'] |
| 1241 except Exception: # pylint: disable=broad-except |
| 1242 # If we couldn't decode anything, just leave the message as None. |
| 1243 pass |
| 1244 |
| 1245 def _TranslateApitoolsResumableUploadException( |
| 1246 self, e, bucket_name=None, object_name=None, generation=None): |
| 1247 if isinstance(e, apitools_exceptions.HttpError): |
| 1248 message = self._GetMessageFromHttpError(e) |
| 1249 if (e.status_code == 503 and |
| 1250 self.http.disable_ssl_certificate_validation): |
| 1251 return ServiceException(_VALIDATE_CERTIFICATES_503_MESSAGE, |
| 1252 status=e.status_code) |
| 1253 elif e.status_code >= 500: |
| 1254 return ResumableUploadException( |
| 1255 message or 'Server Error', status=e.status_code) |
| 1256 elif e.status_code == 429: |
| 1257 return ResumableUploadException( |
| 1258 message or 'Too Many Requests', status=e.status_code) |
| 1259 elif e.status_code == 410: |
| 1260 return ResumableUploadStartOverException( |
| 1261 message or 'Bad Request', status=e.status_code) |
| 1262 elif e.status_code == 404: |
| 1263 return ResumableUploadStartOverException( |
| 1264 message or 'Bad Request', status=e.status_code) |
| 1265 elif e.status_code >= 400: |
| 1266 return ResumableUploadAbortException( |
| 1267 message or 'Bad Request', status=e.status_code) |
| 1268 if isinstance(e, apitools_exceptions.StreamExhausted): |
| 1269 return ResumableUploadAbortException(e.message) |
| 1270 if (isinstance(e, apitools_exceptions.TransferError) and |
| 1271 ('Aborting transfer' in e.message or |
| 1272 'Not enough bytes in stream' in e.message or |
| 1273 'additional bytes left in stream' in e.message)): |
| 1274 return ResumableUploadAbortException(e.message) |
| 1275 |
| 1276 def _TranslateApitoolsException(self, e, bucket_name=None, object_name=None, |
| 1277 generation=None): |
| 1278 """Translates apitools exceptions into their gsutil Cloud Api equivalents. |
| 1279 |
| 1280 Args: |
| 1281 e: Any exception in TRANSLATABLE_APITOOLS_EXCEPTIONS. |
| 1282 bucket_name: Optional bucket name in request that caused the exception. |
| 1283 object_name: Optional object name in request that caused the exception. |
| 1284 generation: Optional generation in request that caused the exception. |
| 1285 |
| 1286 Returns: |
| 1287 CloudStorageApiServiceException for translatable exceptions, None |
| 1288 otherwise. |
| 1289 """ |
| 1290 if isinstance(e, apitools_exceptions.HttpError): |
| 1291 message = self._GetMessageFromHttpError(e) |
| 1292 if e.status_code == 400: |
| 1293 # It is possible that the Project ID is incorrect. Unfortunately the |
| 1294 # JSON API does not give us much information about what part of the |
| 1295 # request was bad. |
| 1296 return BadRequestException(message or 'Bad Request', |
| 1297 status=e.status_code) |
| 1298 elif e.status_code == 401: |
| 1299 if 'Login Required' in str(e): |
| 1300 return AccessDeniedException( |
| 1301 message or 'Access denied: login required.', |
| 1302 status=e.status_code) |
| 1303 elif e.status_code == 403: |
| 1304 if 'The account for the specified project has been disabled' in str(e): |
| 1305 return AccessDeniedException(message or 'Account disabled.', |
| 1306 status=e.status_code) |
| 1307 elif 'Daily Limit for Unauthenticated Use Exceeded' in str(e): |
| 1308 return AccessDeniedException( |
| 1309 message or 'Access denied: quota exceeded. ' |
| 1310 'Is your project ID valid?', |
| 1311 status=e.status_code) |
| 1312 elif 'The bucket you tried to delete was not empty.' in str(e): |
| 1313 return NotEmptyException('BucketNotEmpty (%s)' % bucket_name, |
| 1314 status=e.status_code) |
| 1315 elif ('The bucket you tried to create requires domain ownership ' |
| 1316 'verification.' in str(e)): |
| 1317 return AccessDeniedException( |
| 1318 'The bucket you tried to create requires domain ownership ' |
| 1319 'verification. Please see ' |
| 1320 'https://developers.google.com/storage/docs/bucketnaming' |
| 1321 '?hl=en#verification for more details.', status=e.status_code) |
| 1322 elif 'User Rate Limit Exceeded' in str(e): |
| 1323 return AccessDeniedException('Rate limit exceeded. Please retry this ' |
| 1324 'request later.', status=e.status_code) |
| 1325 elif 'Access Not Configured' in str(e): |
| 1326 return AccessDeniedException( |
| 1327 'Access Not Configured. Please go to the Google Developers ' |
| 1328 'Console (https://cloud.google.com/console#/project) for your ' |
| 1329 'project, select APIs and Auth and enable the ' |
| 1330 'Google Cloud Storage JSON API.', |
| 1331 status=e.status_code) |
| 1332 else: |
| 1333 return AccessDeniedException(message or e.message, |
| 1334 status=e.status_code) |
| 1335 elif e.status_code == 404: |
| 1336 if bucket_name: |
| 1337 if object_name: |
| 1338 return CreateObjectNotFoundException(e.status_code, self.provider, |
| 1339 bucket_name, object_name, |
| 1340 generation=generation) |
| 1341 return CreateBucketNotFoundException(e.status_code, self.provider, |
| 1342 bucket_name) |
| 1343 return NotFoundException(e.message, status=e.status_code) |
| 1344 elif e.status_code == 409 and bucket_name: |
| 1345 if 'The bucket you tried to delete was not empty.' in str(e): |
| 1346 return NotEmptyException('BucketNotEmpty (%s)' % bucket_name, |
| 1347 status=e.status_code) |
| 1348 return ServiceException( |
| 1349 'Bucket %s already exists.' % bucket_name, status=e.status_code) |
| 1350 elif e.status_code == 412: |
| 1351 return PreconditionException(message, status=e.status_code) |
| 1352 elif (e.status_code == 503 and |
| 1353 not self.http.disable_ssl_certificate_validation): |
| 1354 return ServiceException(_VALIDATE_CERTIFICATES_503_MESSAGE, |
| 1355 status=e.status_code) |
| 1356 return ServiceException(message, status=e.status_code) |
| 1357 elif isinstance(e, apitools_exceptions.TransferInvalidError): |
| 1358 return ServiceException('Transfer invalid (possible encoding error: %s)' |
| 1359 % str(e)) |
OLD | NEW |