OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2014 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """JSON gsutil Cloud API implementation for Google Cloud Storage.""" | |
16 | |
17 from __future__ import absolute_import | |
18 | |
19 import httplib | |
20 import json | |
21 import logging | |
22 import os | |
23 import socket | |
24 import ssl | |
25 import time | |
26 import traceback | |
27 | |
28 from apitools.base.py import credentials_lib | |
29 from apitools.base.py import encoding | |
30 from apitools.base.py import exceptions as apitools_exceptions | |
31 from apitools.base.py import http_wrapper as apitools_http_wrapper | |
32 from apitools.base.py import transfer as apitools_transfer | |
33 from apitools.base.py.util import CalculateWaitForRetry | |
34 | |
35 import boto | |
36 from boto import config | |
37 from gcs_oauth2_boto_plugin import oauth2_helper | |
38 import httplib2 | |
39 from oauth2client import devshell | |
40 from oauth2client import multistore_file | |
41 | |
42 from gslib.cloud_api import AccessDeniedException | |
43 from gslib.cloud_api import ArgumentException | |
44 from gslib.cloud_api import BadRequestException | |
45 from gslib.cloud_api import CloudApi | |
46 from gslib.cloud_api import NotEmptyException | |
47 from gslib.cloud_api import NotFoundException | |
48 from gslib.cloud_api import PreconditionException | |
49 from gslib.cloud_api import Preconditions | |
50 from gslib.cloud_api import ResumableDownloadException | |
51 from gslib.cloud_api import ResumableUploadAbortException | |
52 from gslib.cloud_api import ResumableUploadException | |
53 from gslib.cloud_api import ResumableUploadStartOverException | |
54 from gslib.cloud_api import ServiceException | |
55 from gslib.cloud_api_helper import ValidateDstObjectMetadata | |
56 from gslib.cred_types import CredTypes | |
57 from gslib.exception import CommandException | |
58 from gslib.gcs_json_media import BytesTransferredContainer | |
59 from gslib.gcs_json_media import DownloadCallbackConnectionClassFactory | |
60 from gslib.gcs_json_media import HttpWithDownloadStream | |
61 from gslib.gcs_json_media import HttpWithNoRetries | |
62 from gslib.gcs_json_media import UploadCallbackConnectionClassFactory | |
63 from gslib.gcs_json_media import WrapDownloadHttpRequest | |
64 from gslib.gcs_json_media import WrapUploadHttpRequest | |
65 from gslib.no_op_credentials import NoOpCredentials | |
66 from gslib.progress_callback import ProgressCallbackWithBackoff | |
67 from gslib.project_id import PopulateProjectId | |
68 from gslib.third_party.storage_apitools import storage_v1_client as apitools_cli
ent | |
69 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages | |
70 from gslib.tracker_file import DeleteTrackerFile | |
71 from gslib.tracker_file import GetRewriteTrackerFilePath | |
72 from gslib.tracker_file import HashRewriteParameters | |
73 from gslib.tracker_file import ReadRewriteTrackerFile | |
74 from gslib.tracker_file import WriteRewriteTrackerFile | |
75 from gslib.translation_helper import CreateBucketNotFoundException | |
76 from gslib.translation_helper import CreateObjectNotFoundException | |
77 from gslib.translation_helper import DEFAULT_CONTENT_TYPE | |
78 from gslib.translation_helper import REMOVE_CORS_CONFIG | |
79 from gslib.util import GetBotoConfigFileList | |
80 from gslib.util import GetCertsFile | |
81 from gslib.util import GetCredentialStoreFilename | |
82 from gslib.util import GetGceCredentialCacheFilename | |
83 from gslib.util import GetJsonResumableChunkSize | |
84 from gslib.util import GetMaxRetryDelay | |
85 from gslib.util import GetNewHttp | |
86 from gslib.util import GetNumRetries | |
87 from gslib.util import UTF8 | |
88 | |
89 | |
90 # Implementation supports only 'gs' URLs, so provider is unused. | |
91 # pylint: disable=unused-argument | |
92 | |
93 DEFAULT_GCS_JSON_VERSION = 'v1' | |
94 | |
95 NUM_BUCKETS_PER_LIST_PAGE = 1000 | |
96 NUM_OBJECTS_PER_LIST_PAGE = 1000 | |
97 | |
98 TRANSLATABLE_APITOOLS_EXCEPTIONS = (apitools_exceptions.HttpError, | |
99 apitools_exceptions.StreamExhausted, | |
100 apitools_exceptions.TransferError, | |
101 apitools_exceptions.TransferInvalidError) | |
102 | |
103 # TODO: Distribute these exceptions better through apitools and here. | |
104 # Right now, apitools is configured not to handle any exceptions on | |
105 # uploads/downloads. | |
106 # oauth2_client tries to JSON-decode the response, which can result | |
107 # in a ValueError if the response was invalid. Until that is fixed in | |
108 # oauth2_client, need to handle it here. | |
109 HTTP_TRANSFER_EXCEPTIONS = (apitools_exceptions.TransferRetryError, | |
110 apitools_exceptions.BadStatusCodeError, | |
111 # TODO: Honor retry-after headers. | |
112 apitools_exceptions.RetryAfterError, | |
113 apitools_exceptions.RequestError, | |
114 httplib.BadStatusLine, | |
115 httplib.IncompleteRead, | |
116 httplib.ResponseNotReady, | |
117 httplib2.ServerNotFoundError, | |
118 socket.error, | |
119 socket.gaierror, | |
120 socket.timeout, | |
121 ssl.SSLError, | |
122 ValueError) | |
123 | |
124 _VALIDATE_CERTIFICATES_503_MESSAGE = ( | |
125 """Service Unavailable. If you have recently changed | |
126 https_validate_certificates from True to False in your boto configuration | |
127 file, please delete any cached access tokens in your filesystem (at %s) | |
128 and try again.""" % GetCredentialStoreFilename()) | |
129 | |
130 | |
131 class GcsJsonApi(CloudApi): | |
132 """Google Cloud Storage JSON implementation of gsutil Cloud API.""" | |
133 | |
134 def __init__(self, bucket_storage_uri_class, logger, provider=None, | |
135 credentials=None, debug=0): | |
136 """Performs necessary setup for interacting with Google Cloud Storage. | |
137 | |
138 Args: | |
139 bucket_storage_uri_class: Unused. | |
140 logger: logging.logger for outputting log messages. | |
141 provider: Unused. This implementation supports only Google Cloud Storage. | |
142 credentials: Credentials to be used for interacting with Google Cloud | |
143 Storage. | |
144 debug: Debug level for the API implementation (0..3). | |
145 """ | |
146 # TODO: Plumb host_header for perfdiag / test_perfdiag. | |
147 # TODO: Add jitter to apitools' http_wrapper retry mechanism. | |
148 super(GcsJsonApi, self).__init__(bucket_storage_uri_class, logger, | |
149 provider='gs', debug=debug) | |
150 no_op_credentials = False | |
151 if not credentials: | |
152 loaded_credentials = self._CheckAndGetCredentials(logger) | |
153 | |
154 if not loaded_credentials: | |
155 loaded_credentials = NoOpCredentials() | |
156 no_op_credentials = True | |
157 else: | |
158 if isinstance(credentials, NoOpCredentials): | |
159 no_op_credentials = True | |
160 | |
161 self.credentials = credentials or loaded_credentials | |
162 | |
163 self.certs_file = GetCertsFile() | |
164 | |
165 self.http = GetNewHttp() | |
166 self.http_base = 'https://' | |
167 gs_json_host = config.get('Credentials', 'gs_json_host', None) | |
168 self.host_base = gs_json_host or 'www.googleapis.com' | |
169 | |
170 if not gs_json_host: | |
171 gs_host = config.get('Credentials', 'gs_host', None) | |
172 if gs_host: | |
173 raise ArgumentException( | |
174 'JSON API is selected but gs_json_host is not configured, ' | |
175 'while gs_host is configured to %s. Please also configure ' | |
176 'gs_json_host and gs_json_port to match your desired endpoint.' | |
177 % gs_host) | |
178 | |
179 gs_json_port = config.get('Credentials', 'gs_json_port', None) | |
180 | |
181 if not gs_json_port: | |
182 gs_port = config.get('Credentials', 'gs_port', None) | |
183 if gs_port: | |
184 raise ArgumentException( | |
185 'JSON API is selected but gs_json_port is not configured, ' | |
186 'while gs_port is configured to %s. Please also configure ' | |
187 'gs_json_host and gs_json_port to match your desired endpoint.' | |
188 % gs_port) | |
189 self.host_port = '' | |
190 else: | |
191 self.host_port = ':' + config.get('Credentials', 'gs_json_port') | |
192 | |
193 self.api_version = config.get('GSUtil', 'json_api_version', | |
194 DEFAULT_GCS_JSON_VERSION) | |
195 self.url_base = (self.http_base + self.host_base + self.host_port + '/' + | |
196 'storage/' + self.api_version + '/') | |
197 | |
198 self.credentials.set_store( | |
199 multistore_file.get_credential_storage_custom_string_key( | |
200 GetCredentialStoreFilename(), self.api_version)) | |
201 | |
202 self.num_retries = GetNumRetries() | |
203 | |
204 log_request = (debug >= 3) | |
205 log_response = (debug >= 3) | |
206 | |
207 self.api_client = apitools_client.StorageV1( | |
208 url=self.url_base, http=self.http, log_request=log_request, | |
209 log_response=log_response, credentials=self.credentials, | |
210 version=self.api_version) | |
211 self.api_client.num_retries = self.num_retries | |
212 | |
213 if no_op_credentials: | |
214 # This API key is not secret and is used to identify gsutil during | |
215 # anonymous requests. | |
216 self.api_client.AddGlobalParam('key', | |
217 u'AIzaSyDnacJHrKma0048b13sh8cgxNUwulubmJM') | |
218 | |
219 def _CheckAndGetCredentials(self, logger): | |
220 configured_cred_types = [] | |
221 try: | |
222 if self._HasOauth2UserAccountCreds(): | |
223 configured_cred_types.append(CredTypes.OAUTH2_USER_ACCOUNT) | |
224 if self._HasOauth2ServiceAccountCreds(): | |
225 configured_cred_types.append(CredTypes.OAUTH2_SERVICE_ACCOUNT) | |
226 if len(configured_cred_types) > 1: | |
227 # We only allow one set of configured credentials. Otherwise, we're | |
228 # choosing one arbitrarily, which can be very confusing to the user | |
229 # (e.g., if only one is authorized to perform some action) and can | |
230 # also mask errors. | |
231 # Because boto merges config files, GCE credentials show up by default | |
232 # for GCE VMs. We don't want to fail when a user creates a boto file | |
233 # with their own credentials, so in this case we'll use the OAuth2 | |
234 # user credentials. | |
235 failed_cred_type = None | |
236 raise CommandException( | |
237 ('You have multiple types of configured credentials (%s), which is ' | |
238 'not supported. One common way this happens is if you run gsutil ' | |
239 'config to create credentials and later run gcloud auth, and ' | |
240 'create a second set of credentials. Your boto config path is: ' | |
241 '%s. For more help, see "gsutil help creds".') | |
242 % (configured_cred_types, GetBotoConfigFileList())) | |
243 | |
244 failed_cred_type = CredTypes.OAUTH2_USER_ACCOUNT | |
245 user_creds = self._GetOauth2UserAccountCreds() | |
246 failed_cred_type = CredTypes.OAUTH2_SERVICE_ACCOUNT | |
247 service_account_creds = self._GetOauth2ServiceAccountCreds() | |
248 failed_cred_type = CredTypes.GCE | |
249 gce_creds = self._GetGceCreds() | |
250 failed_cred_type = CredTypes.DEVSHELL | |
251 devshell_creds = self._GetDevshellCreds() | |
252 return user_creds or service_account_creds or gce_creds or devshell_creds | |
253 except: # pylint: disable=bare-except | |
254 | |
255 # If we didn't actually try to authenticate because there were multiple | |
256 # types of configured credentials, don't emit this warning. | |
257 if failed_cred_type: | |
258 if os.environ.get('CLOUDSDK_WRAPPER') == '1': | |
259 logger.warn( | |
260 'Your "%s" credentials are invalid. Please run\n' | |
261 ' $ gcloud auth login', failed_cred_type) | |
262 else: | |
263 logger.warn( | |
264 'Your "%s" credentials are invalid. For more help, see ' | |
265 '"gsutil help creds", or re-run the gsutil config command (see ' | |
266 '"gsutil help config").', failed_cred_type) | |
267 | |
268 # If there's any set of configured credentials, we'll fail if they're | |
269 # invalid, rather than silently falling back to anonymous config (as | |
270 # boto does). That approach leads to much confusion if users don't | |
271 # realize their credentials are invalid. | |
272 raise | |
273 | |
274 def _HasOauth2ServiceAccountCreds(self): | |
275 return config.has_option('Credentials', 'gs_service_key_file') | |
276 | |
277 def _HasOauth2UserAccountCreds(self): | |
278 return config.has_option('Credentials', 'gs_oauth2_refresh_token') | |
279 | |
280 def _HasGceCreds(self): | |
281 return config.has_option('GoogleCompute', 'service_account') | |
282 | |
283 def _GetOauth2ServiceAccountCreds(self): | |
284 if self._HasOauth2ServiceAccountCreds(): | |
285 return oauth2_helper.OAuth2ClientFromBotoConfig( | |
286 boto.config, | |
287 cred_type=CredTypes.OAUTH2_SERVICE_ACCOUNT).GetCredentials() | |
288 | |
289 def _GetOauth2UserAccountCreds(self): | |
290 if self._HasOauth2UserAccountCreds(): | |
291 return oauth2_helper.OAuth2ClientFromBotoConfig( | |
292 boto.config).GetCredentials() | |
293 | |
294 def _GetGceCreds(self): | |
295 if self._HasGceCreds(): | |
296 try: | |
297 return credentials_lib.GceAssertionCredentials( | |
298 cache_filename=GetGceCredentialCacheFilename()) | |
299 except apitools_exceptions.ResourceUnavailableError, e: | |
300 if 'service account' in str(e) and 'does not exist' in str(e): | |
301 return None | |
302 raise | |
303 | |
304 def _GetDevshellCreds(self): | |
305 try: | |
306 return devshell.DevshellCredentials() | |
307 except devshell.NoDevshellServer: | |
308 return None | |
309 except: | |
310 raise | |
311 | |
312 def _GetNewDownloadHttp(self, download_stream): | |
313 return GetNewHttp(http_class=HttpWithDownloadStream, stream=download_stream) | |
314 | |
315 def _GetNewUploadHttp(self): | |
316 """Returns an upload-safe Http object (by disabling httplib2 retries).""" | |
317 return GetNewHttp(http_class=HttpWithNoRetries) | |
318 | |
319 def GetBucket(self, bucket_name, provider=None, fields=None): | |
320 """See CloudApi class for function doc strings.""" | |
321 projection = (apitools_messages.StorageBucketsGetRequest | |
322 .ProjectionValueValuesEnum.full) | |
323 apitools_request = apitools_messages.StorageBucketsGetRequest( | |
324 bucket=bucket_name, projection=projection) | |
325 global_params = apitools_messages.StandardQueryParameters() | |
326 if fields: | |
327 global_params.fields = ','.join(set(fields)) | |
328 | |
329 # Here and in list buckets, we have no way of knowing | |
330 # whether we requested a field and didn't get it because it didn't exist | |
331 # or because we didn't have permission to access it. | |
332 try: | |
333 return self.api_client.buckets.Get(apitools_request, | |
334 global_params=global_params) | |
335 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
336 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) | |
337 | |
338 def PatchBucket(self, bucket_name, metadata, canned_acl=None, | |
339 canned_def_acl=None, preconditions=None, provider=None, | |
340 fields=None): | |
341 """See CloudApi class for function doc strings.""" | |
342 projection = (apitools_messages.StorageBucketsPatchRequest | |
343 .ProjectionValueValuesEnum.full) | |
344 bucket_metadata = metadata | |
345 | |
346 if not preconditions: | |
347 preconditions = Preconditions() | |
348 | |
349 # For blank metadata objects, we need to explicitly call | |
350 # them out to apitools so it will send/erase them. | |
351 apitools_include_fields = [] | |
352 for metadata_field in ('metadata', 'lifecycle', 'logging', 'versioning', | |
353 'website'): | |
354 attr = getattr(bucket_metadata, metadata_field, None) | |
355 if attr and not encoding.MessageToDict(attr): | |
356 setattr(bucket_metadata, metadata_field, None) | |
357 apitools_include_fields.append(metadata_field) | |
358 | |
359 if bucket_metadata.cors and bucket_metadata.cors == REMOVE_CORS_CONFIG: | |
360 bucket_metadata.cors = [] | |
361 apitools_include_fields.append('cors') | |
362 | |
363 predefined_acl = None | |
364 if canned_acl: | |
365 # Must null out existing ACLs to apply a canned ACL. | |
366 apitools_include_fields.append('acl') | |
367 predefined_acl = ( | |
368 apitools_messages.StorageBucketsPatchRequest. | |
369 PredefinedAclValueValuesEnum( | |
370 self._BucketCannedAclToPredefinedAcl(canned_acl))) | |
371 | |
372 predefined_def_acl = None | |
373 if canned_def_acl: | |
374 # Must null out existing default object ACLs to apply a canned ACL. | |
375 apitools_include_fields.append('defaultObjectAcl') | |
376 predefined_def_acl = ( | |
377 apitools_messages.StorageBucketsPatchRequest. | |
378 PredefinedDefaultObjectAclValueValuesEnum( | |
379 self._ObjectCannedAclToPredefinedAcl(canned_def_acl))) | |
380 | |
381 apitools_request = apitools_messages.StorageBucketsPatchRequest( | |
382 bucket=bucket_name, bucketResource=bucket_metadata, | |
383 projection=projection, | |
384 ifMetagenerationMatch=preconditions.meta_gen_match, | |
385 predefinedAcl=predefined_acl, | |
386 predefinedDefaultObjectAcl=predefined_def_acl) | |
387 global_params = apitools_messages.StandardQueryParameters() | |
388 if fields: | |
389 global_params.fields = ','.join(set(fields)) | |
390 with self.api_client.IncludeFields(apitools_include_fields): | |
391 try: | |
392 return self.api_client.buckets.Patch(apitools_request, | |
393 global_params=global_params) | |
394 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
395 self._TranslateExceptionAndRaise(e) | |
396 | |
397 def CreateBucket(self, bucket_name, project_id=None, metadata=None, | |
398 provider=None, fields=None): | |
399 """See CloudApi class for function doc strings.""" | |
400 projection = (apitools_messages.StorageBucketsInsertRequest | |
401 .ProjectionValueValuesEnum.full) | |
402 if not metadata: | |
403 metadata = apitools_messages.Bucket() | |
404 metadata.name = bucket_name | |
405 | |
406 if metadata.location: | |
407 metadata.location = metadata.location.upper() | |
408 if metadata.storageClass: | |
409 metadata.storageClass = metadata.storageClass.upper() | |
410 | |
411 project_id = PopulateProjectId(project_id) | |
412 | |
413 apitools_request = apitools_messages.StorageBucketsInsertRequest( | |
414 bucket=metadata, project=project_id, projection=projection) | |
415 global_params = apitools_messages.StandardQueryParameters() | |
416 if fields: | |
417 global_params.fields = ','.join(set(fields)) | |
418 try: | |
419 return self.api_client.buckets.Insert(apitools_request, | |
420 global_params=global_params) | |
421 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
422 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) | |
423 | |
424 def DeleteBucket(self, bucket_name, preconditions=None, provider=None): | |
425 """See CloudApi class for function doc strings.""" | |
426 if not preconditions: | |
427 preconditions = Preconditions() | |
428 | |
429 apitools_request = apitools_messages.StorageBucketsDeleteRequest( | |
430 bucket=bucket_name, ifMetagenerationMatch=preconditions.meta_gen_match) | |
431 | |
432 try: | |
433 self.api_client.buckets.Delete(apitools_request) | |
434 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
435 if isinstance( | |
436 self._TranslateApitoolsException(e, bucket_name=bucket_name), | |
437 NotEmptyException): | |
438 # If bucket is not empty, check to see if versioning is enabled and | |
439 # signal that in the exception if it is. | |
440 bucket_metadata = self.GetBucket(bucket_name, | |
441 fields=['versioning']) | |
442 if bucket_metadata.versioning and bucket_metadata.versioning.enabled: | |
443 raise NotEmptyException('VersionedBucketNotEmpty', | |
444 status=e.status_code) | |
445 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) | |
446 | |
447 def ListBuckets(self, project_id=None, provider=None, fields=None): | |
448 """See CloudApi class for function doc strings.""" | |
449 projection = (apitools_messages.StorageBucketsListRequest | |
450 .ProjectionValueValuesEnum.full) | |
451 project_id = PopulateProjectId(project_id) | |
452 | |
453 apitools_request = apitools_messages.StorageBucketsListRequest( | |
454 project=project_id, maxResults=NUM_BUCKETS_PER_LIST_PAGE, | |
455 projection=projection) | |
456 global_params = apitools_messages.StandardQueryParameters() | |
457 if fields: | |
458 if 'nextPageToken' not in fields: | |
459 fields.add('nextPageToken') | |
460 global_params.fields = ','.join(set(fields)) | |
461 try: | |
462 bucket_list = self.api_client.buckets.List(apitools_request, | |
463 global_params=global_params) | |
464 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
465 self._TranslateExceptionAndRaise(e) | |
466 | |
467 for bucket in self._YieldBuckets(bucket_list): | |
468 yield bucket | |
469 | |
470 while bucket_list.nextPageToken: | |
471 apitools_request = apitools_messages.StorageBucketsListRequest( | |
472 project=project_id, pageToken=bucket_list.nextPageToken, | |
473 maxResults=NUM_BUCKETS_PER_LIST_PAGE, projection=projection) | |
474 try: | |
475 bucket_list = self.api_client.buckets.List(apitools_request, | |
476 global_params=global_params) | |
477 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
478 self._TranslateExceptionAndRaise(e) | |
479 | |
480 for bucket in self._YieldBuckets(bucket_list): | |
481 yield bucket | |
482 | |
483 def _YieldBuckets(self, bucket_list): | |
484 """Yields buckets from a list returned by apitools.""" | |
485 if bucket_list.items: | |
486 for bucket in bucket_list.items: | |
487 yield bucket | |
488 | |
489 def ListObjects(self, bucket_name, prefix=None, delimiter=None, | |
490 all_versions=None, provider=None, fields=None): | |
491 """See CloudApi class for function doc strings.""" | |
492 projection = (apitools_messages.StorageObjectsListRequest | |
493 .ProjectionValueValuesEnum.full) | |
494 apitools_request = apitools_messages.StorageObjectsListRequest( | |
495 bucket=bucket_name, prefix=prefix, delimiter=delimiter, | |
496 versions=all_versions, projection=projection, | |
497 maxResults=NUM_OBJECTS_PER_LIST_PAGE) | |
498 global_params = apitools_messages.StandardQueryParameters() | |
499 | |
500 if fields: | |
501 fields = set(fields) | |
502 if 'nextPageToken' not in fields: | |
503 fields.add('nextPageToken') | |
504 global_params.fields = ','.join(fields) | |
505 | |
506 try: | |
507 object_list = self.api_client.objects.List(apitools_request, | |
508 global_params=global_params) | |
509 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
510 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) | |
511 | |
512 for object_or_prefix in self._YieldObjectsAndPrefixes(object_list): | |
513 yield object_or_prefix | |
514 | |
515 while object_list.nextPageToken: | |
516 apitools_request = apitools_messages.StorageObjectsListRequest( | |
517 bucket=bucket_name, prefix=prefix, delimiter=delimiter, | |
518 versions=all_versions, projection=projection, | |
519 pageToken=object_list.nextPageToken, | |
520 maxResults=NUM_OBJECTS_PER_LIST_PAGE) | |
521 try: | |
522 object_list = self.api_client.objects.List(apitools_request, | |
523 global_params=global_params) | |
524 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
525 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) | |
526 | |
527 for object_or_prefix in self._YieldObjectsAndPrefixes(object_list): | |
528 yield object_or_prefix | |
529 | |
530 def _YieldObjectsAndPrefixes(self, object_list): | |
531 if object_list.items: | |
532 for cloud_obj in object_list.items: | |
533 yield CloudApi.CsObjectOrPrefix(cloud_obj, | |
534 CloudApi.CsObjectOrPrefixType.OBJECT) | |
535 if object_list.prefixes: | |
536 for prefix in object_list.prefixes: | |
537 yield CloudApi.CsObjectOrPrefix(prefix, | |
538 CloudApi.CsObjectOrPrefixType.PREFIX) | |
539 | |
540 def GetObjectMetadata(self, bucket_name, object_name, generation=None, | |
541 provider=None, fields=None): | |
542 """See CloudApi class for function doc strings.""" | |
543 projection = (apitools_messages.StorageObjectsGetRequest | |
544 .ProjectionValueValuesEnum.full) | |
545 | |
546 if generation: | |
547 generation = long(generation) | |
548 | |
549 apitools_request = apitools_messages.StorageObjectsGetRequest( | |
550 bucket=bucket_name, object=object_name, projection=projection, | |
551 generation=generation) | |
552 global_params = apitools_messages.StandardQueryParameters() | |
553 if fields: | |
554 global_params.fields = ','.join(set(fields)) | |
555 | |
556 try: | |
557 return self.api_client.objects.Get(apitools_request, | |
558 global_params=global_params) | |
559 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
560 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, | |
561 object_name=object_name, | |
562 generation=generation) | |
563 | |
564 def GetObjectMedia( | |
565 self, bucket_name, object_name, download_stream, | |
566 provider=None, generation=None, object_size=None, | |
567 download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, start_byte=0, | |
568 end_byte=None, progress_callback=None, serialization_data=None, | |
569 digesters=None): | |
570 """See CloudApi class for function doc strings.""" | |
571 # This implementation will get the object metadata first if we don't pass it | |
572 # in via serialization_data. | |
573 if generation: | |
574 generation = long(generation) | |
575 | |
576 outer_total_size = object_size | |
577 if serialization_data: | |
578 outer_total_size = json.loads(serialization_data)['total_size'] | |
579 | |
580 if progress_callback: | |
581 if outer_total_size is None: | |
582 raise ArgumentException('Download size is required when callbacks are ' | |
583 'requested for a download, but no size was ' | |
584 'provided.') | |
585 progress_callback(0, outer_total_size) | |
586 | |
587 bytes_downloaded_container = BytesTransferredContainer() | |
588 bytes_downloaded_container.bytes_transferred = start_byte | |
589 | |
590 callback_class_factory = DownloadCallbackConnectionClassFactory( | |
591 bytes_downloaded_container, total_size=outer_total_size, | |
592 progress_callback=progress_callback, digesters=digesters) | |
593 download_http_class = callback_class_factory.GetConnectionClass() | |
594 | |
595 download_http = self._GetNewDownloadHttp(download_stream) | |
596 download_http.connections = {'https': download_http_class} | |
597 authorized_download_http = self.credentials.authorize(download_http) | |
598 WrapDownloadHttpRequest(authorized_download_http) | |
599 | |
600 if serialization_data: | |
601 apitools_download = apitools_transfer.Download.FromData( | |
602 download_stream, serialization_data, self.api_client.http, | |
603 num_retries=self.num_retries) | |
604 else: | |
605 apitools_download = apitools_transfer.Download.FromStream( | |
606 download_stream, auto_transfer=False, total_size=object_size, | |
607 num_retries=self.num_retries) | |
608 | |
609 apitools_download.bytes_http = authorized_download_http | |
610 apitools_request = apitools_messages.StorageObjectsGetRequest( | |
611 bucket=bucket_name, object=object_name, generation=generation) | |
612 | |
613 try: | |
614 if download_strategy == CloudApi.DownloadStrategy.RESUMABLE: | |
615 # Disable retries in apitools. We will handle them explicitly here. | |
616 apitools_download.retry_func = ( | |
617 apitools_http_wrapper.RethrowExceptionHandler) | |
618 return self._PerformResumableDownload( | |
619 bucket_name, object_name, download_stream, apitools_request, | |
620 apitools_download, bytes_downloaded_container, | |
621 generation=generation, start_byte=start_byte, end_byte=end_byte, | |
622 serialization_data=serialization_data) | |
623 else: | |
624 return self._PerformDownload( | |
625 bucket_name, object_name, download_stream, apitools_request, | |
626 apitools_download, generation=generation, start_byte=start_byte, | |
627 end_byte=end_byte, serialization_data=serialization_data) | |
628 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
629 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, | |
630 object_name=object_name, | |
631 generation=generation) | |
632 | |
633 def _PerformResumableDownload( | |
634 self, bucket_name, object_name, download_stream, apitools_request, | |
635 apitools_download, bytes_downloaded_container, generation=None, | |
636 start_byte=0, end_byte=None, serialization_data=None): | |
637 retries = 0 | |
638 last_progress_byte = start_byte | |
639 while retries <= self.num_retries: | |
640 try: | |
641 return self._PerformDownload( | |
642 bucket_name, object_name, download_stream, apitools_request, | |
643 apitools_download, generation=generation, start_byte=start_byte, | |
644 end_byte=end_byte, serialization_data=serialization_data) | |
645 except HTTP_TRANSFER_EXCEPTIONS, e: | |
646 start_byte = download_stream.tell() | |
647 bytes_downloaded_container.bytes_transferred = start_byte | |
648 if start_byte > last_progress_byte: | |
649 # We've made progress, so allow a fresh set of retries. | |
650 last_progress_byte = start_byte | |
651 retries = 0 | |
652 retries += 1 | |
653 if retries > self.num_retries: | |
654 raise ResumableDownloadException( | |
655 'Transfer failed after %d retries. Final exception: %s' % | |
656 (self.num_retries, unicode(e).encode(UTF8))) | |
657 time.sleep(CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay())) | |
658 if self.logger.isEnabledFor(logging.DEBUG): | |
659 self.logger.debug( | |
660 'Retrying download from byte %s after exception: %s. Trace: %s', | |
661 start_byte, unicode(e).encode(UTF8), traceback.format_exc()) | |
662 apitools_http_wrapper.RebuildHttpConnections( | |
663 apitools_download.bytes_http) | |
664 | |
665 def _PerformDownload( | |
666 self, bucket_name, object_name, download_stream, apitools_request, | |
667 apitools_download, generation=None, start_byte=0, end_byte=None, | |
668 serialization_data=None): | |
669 if not serialization_data: | |
670 try: | |
671 self.api_client.objects.Get(apitools_request, | |
672 download=apitools_download) | |
673 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
674 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, | |
675 object_name=object_name, | |
676 generation=generation) | |
677 | |
678 # Disable apitools' default print callbacks. | |
679 def _NoOpCallback(unused_response, unused_download_object): | |
680 pass | |
681 | |
682 # TODO: If we have a resumable download with accept-encoding:gzip | |
683 # on a object that is compressible but not in gzip form in the cloud, | |
684 # on-the-fly compression will gzip the object. In this case if our | |
685 # download breaks, future requests will ignore the range header and just | |
686 # return the object (gzipped) in its entirety. Ideally, we would unzip | |
687 # the bytes that we have locally and send a range request without | |
688 # accept-encoding:gzip so that we can download only the (uncompressed) bytes | |
689 # that we don't yet have. | |
690 | |
691 # Since bytes_http is created in this function, we don't get the | |
692 # user-agent header from api_client's http automatically. | |
693 additional_headers = { | |
694 'accept-encoding': 'gzip', | |
695 'user-agent': self.api_client.user_agent | |
696 } | |
697 if start_byte or end_byte: | |
698 apitools_download.GetRange(additional_headers=additional_headers, | |
699 start=start_byte, end=end_byte, | |
700 use_chunks=False) | |
701 else: | |
702 apitools_download.StreamMedia( | |
703 callback=_NoOpCallback, finish_callback=_NoOpCallback, | |
704 additional_headers=additional_headers, use_chunks=False) | |
705 return apitools_download.encoding | |
706 | |
707 def PatchObjectMetadata(self, bucket_name, object_name, metadata, | |
708 canned_acl=None, generation=None, preconditions=None, | |
709 provider=None, fields=None): | |
710 """See CloudApi class for function doc strings.""" | |
711 projection = (apitools_messages.StorageObjectsPatchRequest | |
712 .ProjectionValueValuesEnum.full) | |
713 | |
714 if not preconditions: | |
715 preconditions = Preconditions() | |
716 | |
717 if generation: | |
718 generation = long(generation) | |
719 | |
720 predefined_acl = None | |
721 apitools_include_fields = [] | |
722 if canned_acl: | |
723 # Must null out existing ACLs to apply a canned ACL. | |
724 apitools_include_fields.append('acl') | |
725 predefined_acl = ( | |
726 apitools_messages.StorageObjectsPatchRequest. | |
727 PredefinedAclValueValuesEnum( | |
728 self._ObjectCannedAclToPredefinedAcl(canned_acl))) | |
729 | |
730 apitools_request = apitools_messages.StorageObjectsPatchRequest( | |
731 bucket=bucket_name, object=object_name, objectResource=metadata, | |
732 generation=generation, projection=projection, | |
733 ifGenerationMatch=preconditions.gen_match, | |
734 ifMetagenerationMatch=preconditions.meta_gen_match, | |
735 predefinedAcl=predefined_acl) | |
736 global_params = apitools_messages.StandardQueryParameters() | |
737 if fields: | |
738 global_params.fields = ','.join(set(fields)) | |
739 | |
740 try: | |
741 with self.api_client.IncludeFields(apitools_include_fields): | |
742 return self.api_client.objects.Patch(apitools_request, | |
743 global_params=global_params) | |
744 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
745 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, | |
746 object_name=object_name, | |
747 generation=generation) | |
748 | |
749 def _UploadObject(self, upload_stream, object_metadata, canned_acl=None, | |
750 size=None, preconditions=None, provider=None, fields=None, | |
751 serialization_data=None, tracker_callback=None, | |
752 progress_callback=None, | |
753 apitools_strategy=apitools_transfer.SIMPLE_UPLOAD, | |
754 total_size=0): | |
755 # pylint: disable=g-doc-args | |
756 """Upload implementation. Cloud API arguments, plus two more. | |
757 | |
758 Additional args: | |
759 apitools_strategy: SIMPLE_UPLOAD or RESUMABLE_UPLOAD. | |
760 total_size: Total size of the upload; None if it is unknown (streaming). | |
761 | |
762 Returns: | |
763 Uploaded object metadata. | |
764 """ | |
765 # pylint: enable=g-doc-args | |
766 ValidateDstObjectMetadata(object_metadata) | |
767 predefined_acl = None | |
768 if canned_acl: | |
769 predefined_acl = ( | |
770 apitools_messages.StorageObjectsInsertRequest. | |
771 PredefinedAclValueValuesEnum( | |
772 self._ObjectCannedAclToPredefinedAcl(canned_acl))) | |
773 | |
774 bytes_uploaded_container = BytesTransferredContainer() | |
775 | |
776 if progress_callback and size: | |
777 total_size = size | |
778 progress_callback(0, size) | |
779 | |
780 callback_class_factory = UploadCallbackConnectionClassFactory( | |
781 bytes_uploaded_container, total_size=total_size, | |
782 progress_callback=progress_callback) | |
783 | |
784 upload_http = self._GetNewUploadHttp() | |
785 upload_http_class = callback_class_factory.GetConnectionClass() | |
786 upload_http.connections = {'http': upload_http_class, | |
787 'https': upload_http_class} | |
788 | |
789 authorized_upload_http = self.credentials.authorize(upload_http) | |
790 WrapUploadHttpRequest(authorized_upload_http) | |
791 # Since bytes_http is created in this function, we don't get the | |
792 # user-agent header from api_client's http automatically. | |
793 additional_headers = { | |
794 'user-agent': self.api_client.user_agent | |
795 } | |
796 | |
797 try: | |
798 content_type = None | |
799 apitools_request = None | |
800 global_params = None | |
801 if not serialization_data: | |
802 # This is a new upload, set up initial upload state. | |
803 content_type = object_metadata.contentType | |
804 if not content_type: | |
805 content_type = DEFAULT_CONTENT_TYPE | |
806 | |
807 if not preconditions: | |
808 preconditions = Preconditions() | |
809 | |
810 apitools_request = apitools_messages.StorageObjectsInsertRequest( | |
811 bucket=object_metadata.bucket, object=object_metadata, | |
812 ifGenerationMatch=preconditions.gen_match, | |
813 ifMetagenerationMatch=preconditions.meta_gen_match, | |
814 predefinedAcl=predefined_acl) | |
815 global_params = apitools_messages.StandardQueryParameters() | |
816 if fields: | |
817 global_params.fields = ','.join(set(fields)) | |
818 | |
819 if apitools_strategy == apitools_transfer.SIMPLE_UPLOAD: | |
820 # One-shot upload. | |
821 apitools_upload = apitools_transfer.Upload( | |
822 upload_stream, content_type, total_size=size, auto_transfer=True, | |
823 num_retries=self.num_retries) | |
824 apitools_upload.strategy = apitools_strategy | |
825 apitools_upload.bytes_http = authorized_upload_http | |
826 | |
827 return self.api_client.objects.Insert( | |
828 apitools_request, | |
829 upload=apitools_upload, | |
830 global_params=global_params) | |
831 else: # Resumable upload. | |
832 return self._PerformResumableUpload( | |
833 upload_stream, authorized_upload_http, content_type, size, | |
834 serialization_data, apitools_strategy, apitools_request, | |
835 global_params, bytes_uploaded_container, tracker_callback, | |
836 additional_headers, progress_callback) | |
837 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
838 self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket, | |
839 object_name=object_metadata.name) | |
840 | |
841 def _PerformResumableUpload( | |
842 self, upload_stream, authorized_upload_http, content_type, size, | |
843 serialization_data, apitools_strategy, apitools_request, global_params, | |
844 bytes_uploaded_container, tracker_callback, addl_headers, | |
845 progress_callback): | |
846 try: | |
847 if serialization_data: | |
848 # Resuming an existing upload. | |
849 apitools_upload = apitools_transfer.Upload.FromData( | |
850 upload_stream, serialization_data, self.api_client.http, | |
851 num_retries=self.num_retries) | |
852 apitools_upload.chunksize = GetJsonResumableChunkSize() | |
853 apitools_upload.bytes_http = authorized_upload_http | |
854 else: | |
855 # New resumable upload. | |
856 apitools_upload = apitools_transfer.Upload( | |
857 upload_stream, content_type, total_size=size, | |
858 chunksize=GetJsonResumableChunkSize(), auto_transfer=False, | |
859 num_retries=self.num_retries) | |
860 apitools_upload.strategy = apitools_strategy | |
861 apitools_upload.bytes_http = authorized_upload_http | |
862 self.api_client.objects.Insert( | |
863 apitools_request, | |
864 upload=apitools_upload, | |
865 global_params=global_params) | |
866 # Disable retries in apitools. We will handle them explicitly here. | |
867 apitools_upload.retry_func = ( | |
868 apitools_http_wrapper.RethrowExceptionHandler) | |
869 | |
870 # Disable apitools' default print callbacks. | |
871 def _NoOpCallback(unused_response, unused_upload_object): | |
872 pass | |
873 | |
874 # If we're resuming an upload, apitools has at this point received | |
875 # from the server how many bytes it already has. Update our | |
876 # callback class with this information. | |
877 bytes_uploaded_container.bytes_transferred = apitools_upload.progress | |
878 if tracker_callback: | |
879 tracker_callback(json.dumps(apitools_upload.serialization_data)) | |
880 | |
881 retries = 0 | |
882 last_progress_byte = apitools_upload.progress | |
883 while retries <= self.num_retries: | |
884 try: | |
885 # TODO: On retry, this will seek to the bytes that the server has, | |
886 # causing the hash to be recalculated. Make HashingFileUploadWrapper | |
887 # save a digest according to json_resumable_chunk_size. | |
888 if size: | |
889 # If size is known, we can send it all in one request and avoid | |
890 # making a round-trip per chunk. | |
891 http_response = apitools_upload.StreamMedia( | |
892 callback=_NoOpCallback, finish_callback=_NoOpCallback, | |
893 additional_headers=addl_headers) | |
894 else: | |
895 # Otherwise it's a streaming request and we need to ensure that we | |
896 # send the bytes in chunks so that we can guarantee that we never | |
897 # need to seek backwards more than our buffer (and also that the | |
898 # chunks are aligned to 256KB). | |
899 http_response = apitools_upload.StreamInChunks( | |
900 callback=_NoOpCallback, finish_callback=_NoOpCallback, | |
901 additional_headers=addl_headers) | |
902 processed_response = self.api_client.objects.ProcessHttpResponse( | |
903 self.api_client.objects.GetMethodConfig('Insert'), http_response) | |
904 if size is None and progress_callback: | |
905 # Make final progress callback; total size should now be known. | |
906 # This works around the fact the send function counts header bytes. | |
907 # However, this will make the progress appear to go slightly | |
908 # backwards at the end. | |
909 progress_callback(apitools_upload.total_size, | |
910 apitools_upload.total_size) | |
911 return processed_response | |
912 except HTTP_TRANSFER_EXCEPTIONS, e: | |
913 apitools_http_wrapper.RebuildHttpConnections( | |
914 apitools_upload.bytes_http) | |
915 while retries <= self.num_retries: | |
916 try: | |
917 # TODO: Simulate the refresh case in tests. Right now, our | |
918 # mocks are not complex enough to simulate a failure. | |
919 apitools_upload.RefreshResumableUploadState() | |
920 start_byte = apitools_upload.progress | |
921 bytes_uploaded_container.bytes_transferred = start_byte | |
922 break | |
923 except HTTP_TRANSFER_EXCEPTIONS, e2: | |
924 apitools_http_wrapper.RebuildHttpConnections( | |
925 apitools_upload.bytes_http) | |
926 retries += 1 | |
927 if retries > self.num_retries: | |
928 raise ResumableUploadException( | |
929 'Transfer failed after %d retries. Final exception: %s' % | |
930 (self.num_retries, e2)) | |
931 time.sleep( | |
932 CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay())) | |
933 if start_byte > last_progress_byte: | |
934 # We've made progress, so allow a fresh set of retries. | |
935 last_progress_byte = start_byte | |
936 retries = 0 | |
937 else: | |
938 retries += 1 | |
939 if retries > self.num_retries: | |
940 raise ResumableUploadException( | |
941 'Transfer failed after %d retries. Final exception: %s' % | |
942 (self.num_retries, unicode(e).encode(UTF8))) | |
943 time.sleep( | |
944 CalculateWaitForRetry(retries, max_wait=GetMaxRetryDelay())) | |
945 if self.logger.isEnabledFor(logging.DEBUG): | |
946 self.logger.debug( | |
947 'Retrying upload from byte %s after exception: %s. Trace: %s', | |
948 start_byte, unicode(e).encode(UTF8), traceback.format_exc()) | |
949 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
950 resumable_ex = self._TranslateApitoolsResumableUploadException(e) | |
951 if resumable_ex: | |
952 raise resumable_ex | |
953 else: | |
954 raise | |
955 | |
956 def UploadObject(self, upload_stream, object_metadata, canned_acl=None, | |
957 size=None, preconditions=None, progress_callback=None, | |
958 provider=None, fields=None): | |
959 """See CloudApi class for function doc strings.""" | |
960 return self._UploadObject( | |
961 upload_stream, object_metadata, canned_acl=canned_acl, | |
962 size=size, preconditions=preconditions, | |
963 progress_callback=progress_callback, fields=fields, | |
964 apitools_strategy=apitools_transfer.SIMPLE_UPLOAD) | |
965 | |
966 def UploadObjectStreaming(self, upload_stream, object_metadata, | |
967 canned_acl=None, preconditions=None, | |
968 progress_callback=None, provider=None, | |
969 fields=None): | |
970 """See CloudApi class for function doc strings.""" | |
971 # Streaming indicated by not passing a size. | |
972 # Resumable capabilities are present up to the resumable chunk size using | |
973 # a buffered stream. | |
974 return self._UploadObject( | |
975 upload_stream, object_metadata, canned_acl=canned_acl, | |
976 preconditions=preconditions, progress_callback=progress_callback, | |
977 fields=fields, apitools_strategy=apitools_transfer.RESUMABLE_UPLOAD, | |
978 total_size=None) | |
979 | |
980 def UploadObjectResumable( | |
981 self, upload_stream, object_metadata, canned_acl=None, preconditions=None, | |
982 provider=None, fields=None, size=None, serialization_data=None, | |
983 tracker_callback=None, progress_callback=None): | |
984 """See CloudApi class for function doc strings.""" | |
985 return self._UploadObject( | |
986 upload_stream, object_metadata, canned_acl=canned_acl, | |
987 preconditions=preconditions, fields=fields, size=size, | |
988 serialization_data=serialization_data, | |
989 tracker_callback=tracker_callback, progress_callback=progress_callback, | |
990 apitools_strategy=apitools_transfer.RESUMABLE_UPLOAD) | |
991 | |
992 def CopyObject(self, src_obj_metadata, dst_obj_metadata, src_generation=None, | |
993 canned_acl=None, preconditions=None, progress_callback=None, | |
994 max_bytes_per_call=None, provider=None, fields=None): | |
995 """See CloudApi class for function doc strings.""" | |
996 ValidateDstObjectMetadata(dst_obj_metadata) | |
997 predefined_acl = None | |
998 if canned_acl: | |
999 predefined_acl = ( | |
1000 apitools_messages.StorageObjectsRewriteRequest. | |
1001 DestinationPredefinedAclValueValuesEnum( | |
1002 self._ObjectCannedAclToPredefinedAcl(canned_acl))) | |
1003 | |
1004 if src_generation: | |
1005 src_generation = long(src_generation) | |
1006 | |
1007 if not preconditions: | |
1008 preconditions = Preconditions() | |
1009 | |
1010 projection = (apitools_messages.StorageObjectsRewriteRequest. | |
1011 ProjectionValueValuesEnum.full) | |
1012 global_params = apitools_messages.StandardQueryParameters() | |
1013 if fields: | |
1014 # Rewrite returns the resultant object under the 'resource' field. | |
1015 new_fields = set(['done', 'objectSize', 'rewriteToken', | |
1016 'totalBytesRewritten']) | |
1017 for field in fields: | |
1018 new_fields.add('resource/' + field) | |
1019 global_params.fields = ','.join(set(new_fields)) | |
1020 | |
1021 # Check to see if we are resuming a rewrite. | |
1022 tracker_file_name = GetRewriteTrackerFilePath( | |
1023 src_obj_metadata.bucket, src_obj_metadata.name, dst_obj_metadata.bucket, | |
1024 dst_obj_metadata.name, 'JSON') | |
1025 rewrite_params_hash = HashRewriteParameters( | |
1026 src_obj_metadata, dst_obj_metadata, projection, | |
1027 src_generation=src_generation, gen_match=preconditions.gen_match, | |
1028 meta_gen_match=preconditions.meta_gen_match, | |
1029 canned_acl=predefined_acl, fields=global_params.fields, | |
1030 max_bytes_per_call=max_bytes_per_call) | |
1031 resume_rewrite_token = ReadRewriteTrackerFile(tracker_file_name, | |
1032 rewrite_params_hash) | |
1033 | |
1034 progress_cb_with_backoff = None | |
1035 try: | |
1036 last_bytes_written = 0L | |
1037 while True: | |
1038 apitools_request = apitools_messages.StorageObjectsRewriteRequest( | |
1039 sourceBucket=src_obj_metadata.bucket, | |
1040 sourceObject=src_obj_metadata.name, | |
1041 destinationBucket=dst_obj_metadata.bucket, | |
1042 destinationObject=dst_obj_metadata.name, | |
1043 projection=projection, object=dst_obj_metadata, | |
1044 sourceGeneration=src_generation, | |
1045 ifGenerationMatch=preconditions.gen_match, | |
1046 ifMetagenerationMatch=preconditions.meta_gen_match, | |
1047 destinationPredefinedAcl=predefined_acl, | |
1048 rewriteToken=resume_rewrite_token, | |
1049 maxBytesRewrittenPerCall=max_bytes_per_call) | |
1050 rewrite_response = self.api_client.objects.Rewrite( | |
1051 apitools_request, global_params=global_params) | |
1052 bytes_written = long(rewrite_response.totalBytesRewritten) | |
1053 if progress_callback and not progress_cb_with_backoff: | |
1054 progress_cb_with_backoff = ProgressCallbackWithBackoff( | |
1055 long(rewrite_response.objectSize), progress_callback) | |
1056 if progress_cb_with_backoff: | |
1057 progress_cb_with_backoff.Progress( | |
1058 bytes_written - last_bytes_written) | |
1059 | |
1060 if rewrite_response.done: | |
1061 break | |
1062 elif not resume_rewrite_token: | |
1063 # Save the token and make a tracker file if they don't already exist. | |
1064 resume_rewrite_token = rewrite_response.rewriteToken | |
1065 WriteRewriteTrackerFile(tracker_file_name, rewrite_params_hash, | |
1066 rewrite_response.rewriteToken) | |
1067 last_bytes_written = bytes_written | |
1068 | |
1069 DeleteTrackerFile(tracker_file_name) | |
1070 return rewrite_response.resource | |
1071 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
1072 self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket, | |
1073 object_name=dst_obj_metadata.name) | |
1074 | |
1075 def DeleteObject(self, bucket_name, object_name, preconditions=None, | |
1076 generation=None, provider=None): | |
1077 """See CloudApi class for function doc strings.""" | |
1078 if not preconditions: | |
1079 preconditions = Preconditions() | |
1080 | |
1081 if generation: | |
1082 generation = long(generation) | |
1083 | |
1084 apitools_request = apitools_messages.StorageObjectsDeleteRequest( | |
1085 bucket=bucket_name, object=object_name, generation=generation, | |
1086 ifGenerationMatch=preconditions.gen_match, | |
1087 ifMetagenerationMatch=preconditions.meta_gen_match) | |
1088 try: | |
1089 return self.api_client.objects.Delete(apitools_request) | |
1090 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
1091 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, | |
1092 object_name=object_name, | |
1093 generation=generation) | |
1094 | |
1095 def ComposeObject(self, src_objs_metadata, dst_obj_metadata, | |
1096 preconditions=None, provider=None, fields=None): | |
1097 """See CloudApi class for function doc strings.""" | |
1098 ValidateDstObjectMetadata(dst_obj_metadata) | |
1099 | |
1100 dst_obj_name = dst_obj_metadata.name | |
1101 dst_obj_metadata.name = None | |
1102 dst_bucket_name = dst_obj_metadata.bucket | |
1103 dst_obj_metadata.bucket = None | |
1104 if not dst_obj_metadata.contentType: | |
1105 dst_obj_metadata.contentType = DEFAULT_CONTENT_TYPE | |
1106 | |
1107 if not preconditions: | |
1108 preconditions = Preconditions() | |
1109 | |
1110 global_params = apitools_messages.StandardQueryParameters() | |
1111 if fields: | |
1112 global_params.fields = ','.join(set(fields)) | |
1113 | |
1114 src_objs_compose_request = apitools_messages.ComposeRequest( | |
1115 sourceObjects=src_objs_metadata, destination=dst_obj_metadata) | |
1116 | |
1117 apitools_request = apitools_messages.StorageObjectsComposeRequest( | |
1118 composeRequest=src_objs_compose_request, | |
1119 destinationBucket=dst_bucket_name, | |
1120 destinationObject=dst_obj_name, | |
1121 ifGenerationMatch=preconditions.gen_match, | |
1122 ifMetagenerationMatch=preconditions.meta_gen_match) | |
1123 try: | |
1124 return self.api_client.objects.Compose(apitools_request, | |
1125 global_params=global_params) | |
1126 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
1127 # We can't be sure which object was missing in the 404 case. | |
1128 if isinstance(e, apitools_exceptions.HttpError) and e.status_code == 404: | |
1129 raise NotFoundException('One of the source objects does not exist.') | |
1130 else: | |
1131 self._TranslateExceptionAndRaise(e) | |
1132 | |
1133 def WatchBucket(self, bucket_name, address, channel_id, token=None, | |
1134 provider=None, fields=None): | |
1135 """See CloudApi class for function doc strings.""" | |
1136 projection = (apitools_messages.StorageObjectsWatchAllRequest | |
1137 .ProjectionValueValuesEnum.full) | |
1138 | |
1139 channel = apitools_messages.Channel(address=address, id=channel_id, | |
1140 token=token, type='WEB_HOOK') | |
1141 | |
1142 apitools_request = apitools_messages.StorageObjectsWatchAllRequest( | |
1143 bucket=bucket_name, channel=channel, projection=projection) | |
1144 | |
1145 global_params = apitools_messages.StandardQueryParameters() | |
1146 if fields: | |
1147 global_params.fields = ','.join(set(fields)) | |
1148 | |
1149 try: | |
1150 return self.api_client.objects.WatchAll(apitools_request, | |
1151 global_params=global_params) | |
1152 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
1153 self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) | |
1154 | |
1155 def StopChannel(self, channel_id, resource_id, provider=None): | |
1156 """See CloudApi class for function doc strings.""" | |
1157 channel = apitools_messages.Channel(id=channel_id, resourceId=resource_id) | |
1158 try: | |
1159 self.api_client.channels.Stop(channel) | |
1160 except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: | |
1161 self._TranslateExceptionAndRaise(e) | |
1162 | |
1163 def _BucketCannedAclToPredefinedAcl(self, canned_acl_string): | |
1164 """Translates the input string to a bucket PredefinedAcl string. | |
1165 | |
1166 Args: | |
1167 canned_acl_string: Canned ACL string. | |
1168 | |
1169 Returns: | |
1170 String that can be used as a query parameter with the JSON API. This | |
1171 corresponds to a flavor of *PredefinedAclValueValuesEnum and can be | |
1172 used as input to apitools requests that affect bucket access controls. | |
1173 """ | |
1174 # XML : JSON | |
1175 translation_dict = { | |
1176 None: None, | |
1177 'authenticated-read': 'authenticatedRead', | |
1178 'private': 'private', | |
1179 'project-private': 'projectPrivate', | |
1180 'public-read': 'publicRead', | |
1181 'public-read-write': 'publicReadWrite' | |
1182 } | |
1183 if canned_acl_string in translation_dict: | |
1184 return translation_dict[canned_acl_string] | |
1185 raise ArgumentException('Invalid canned ACL %s' % canned_acl_string) | |
1186 | |
1187 def _ObjectCannedAclToPredefinedAcl(self, canned_acl_string): | |
1188 """Translates the input string to an object PredefinedAcl string. | |
1189 | |
1190 Args: | |
1191 canned_acl_string: Canned ACL string. | |
1192 | |
1193 Returns: | |
1194 String that can be used as a query parameter with the JSON API. This | |
1195 corresponds to a flavor of *PredefinedAclValueValuesEnum and can be | |
1196 used as input to apitools requests that affect object access controls. | |
1197 """ | |
1198 # XML : JSON | |
1199 translation_dict = { | |
1200 None: None, | |
1201 'authenticated-read': 'authenticatedRead', | |
1202 'bucket-owner-read': 'bucketOwnerRead', | |
1203 'bucket-owner-full-control': 'bucketOwnerFullControl', | |
1204 'private': 'private', | |
1205 'project-private': 'projectPrivate', | |
1206 'public-read': 'publicRead' | |
1207 } | |
1208 if canned_acl_string in translation_dict: | |
1209 return translation_dict[canned_acl_string] | |
1210 raise ArgumentException('Invalid canned ACL %s' % canned_acl_string) | |
1211 | |
1212 def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None, | |
1213 generation=None): | |
1214 """Translates an HTTP exception and raises the translated or original value. | |
1215 | |
1216 Args: | |
1217 e: Any Exception. | |
1218 bucket_name: Optional bucket name in request that caused the exception. | |
1219 object_name: Optional object name in request that caused the exception. | |
1220 generation: Optional generation in request that caused the exception. | |
1221 | |
1222 Raises: | |
1223 Translated CloudApi exception, or the original exception if it was not | |
1224 translatable. | |
1225 """ | |
1226 translated_exception = self._TranslateApitoolsException( | |
1227 e, bucket_name=bucket_name, object_name=object_name, | |
1228 generation=generation) | |
1229 if translated_exception: | |
1230 raise translated_exception | |
1231 else: | |
1232 raise | |
1233 | |
1234 def _GetMessageFromHttpError(self, http_error): | |
1235 if isinstance(http_error, apitools_exceptions.HttpError): | |
1236 if getattr(http_error, 'content', None): | |
1237 try: | |
1238 json_obj = json.loads(http_error.content) | |
1239 if 'error' in json_obj and 'message' in json_obj['error']: | |
1240 return json_obj['error']['message'] | |
1241 except Exception: # pylint: disable=broad-except | |
1242 # If we couldn't decode anything, just leave the message as None. | |
1243 pass | |
1244 | |
1245 def _TranslateApitoolsResumableUploadException( | |
1246 self, e, bucket_name=None, object_name=None, generation=None): | |
1247 if isinstance(e, apitools_exceptions.HttpError): | |
1248 message = self._GetMessageFromHttpError(e) | |
1249 if (e.status_code == 503 and | |
1250 self.http.disable_ssl_certificate_validation): | |
1251 return ServiceException(_VALIDATE_CERTIFICATES_503_MESSAGE, | |
1252 status=e.status_code) | |
1253 elif e.status_code >= 500: | |
1254 return ResumableUploadException( | |
1255 message or 'Server Error', status=e.status_code) | |
1256 elif e.status_code == 429: | |
1257 return ResumableUploadException( | |
1258 message or 'Too Many Requests', status=e.status_code) | |
1259 elif e.status_code == 410: | |
1260 return ResumableUploadStartOverException( | |
1261 message or 'Bad Request', status=e.status_code) | |
1262 elif e.status_code == 404: | |
1263 return ResumableUploadStartOverException( | |
1264 message or 'Bad Request', status=e.status_code) | |
1265 elif e.status_code >= 400: | |
1266 return ResumableUploadAbortException( | |
1267 message or 'Bad Request', status=e.status_code) | |
1268 if isinstance(e, apitools_exceptions.StreamExhausted): | |
1269 return ResumableUploadAbortException(e.message) | |
1270 if (isinstance(e, apitools_exceptions.TransferError) and | |
1271 ('Aborting transfer' in e.message or | |
1272 'Not enough bytes in stream' in e.message or | |
1273 'additional bytes left in stream' in e.message)): | |
1274 return ResumableUploadAbortException(e.message) | |
1275 | |
1276 def _TranslateApitoolsException(self, e, bucket_name=None, object_name=None, | |
1277 generation=None): | |
1278 """Translates apitools exceptions into their gsutil Cloud Api equivalents. | |
1279 | |
1280 Args: | |
1281 e: Any exception in TRANSLATABLE_APITOOLS_EXCEPTIONS. | |
1282 bucket_name: Optional bucket name in request that caused the exception. | |
1283 object_name: Optional object name in request that caused the exception. | |
1284 generation: Optional generation in request that caused the exception. | |
1285 | |
1286 Returns: | |
1287 CloudStorageApiServiceException for translatable exceptions, None | |
1288 otherwise. | |
1289 """ | |
1290 if isinstance(e, apitools_exceptions.HttpError): | |
1291 message = self._GetMessageFromHttpError(e) | |
1292 if e.status_code == 400: | |
1293 # It is possible that the Project ID is incorrect. Unfortunately the | |
1294 # JSON API does not give us much information about what part of the | |
1295 # request was bad. | |
1296 return BadRequestException(message or 'Bad Request', | |
1297 status=e.status_code) | |
1298 elif e.status_code == 401: | |
1299 if 'Login Required' in str(e): | |
1300 return AccessDeniedException( | |
1301 message or 'Access denied: login required.', | |
1302 status=e.status_code) | |
1303 elif e.status_code == 403: | |
1304 if 'The account for the specified project has been disabled' in str(e): | |
1305 return AccessDeniedException(message or 'Account disabled.', | |
1306 status=e.status_code) | |
1307 elif 'Daily Limit for Unauthenticated Use Exceeded' in str(e): | |
1308 return AccessDeniedException( | |
1309 message or 'Access denied: quota exceeded. ' | |
1310 'Is your project ID valid?', | |
1311 status=e.status_code) | |
1312 elif 'The bucket you tried to delete was not empty.' in str(e): | |
1313 return NotEmptyException('BucketNotEmpty (%s)' % bucket_name, | |
1314 status=e.status_code) | |
1315 elif ('The bucket you tried to create requires domain ownership ' | |
1316 'verification.' in str(e)): | |
1317 return AccessDeniedException( | |
1318 'The bucket you tried to create requires domain ownership ' | |
1319 'verification. Please see ' | |
1320 'https://developers.google.com/storage/docs/bucketnaming' | |
1321 '?hl=en#verification for more details.', status=e.status_code) | |
1322 elif 'User Rate Limit Exceeded' in str(e): | |
1323 return AccessDeniedException('Rate limit exceeded. Please retry this ' | |
1324 'request later.', status=e.status_code) | |
1325 elif 'Access Not Configured' in str(e): | |
1326 return AccessDeniedException( | |
1327 'Access Not Configured. Please go to the Google Developers ' | |
1328 'Console (https://cloud.google.com/console#/project) for your ' | |
1329 'project, select APIs and Auth and enable the ' | |
1330 'Google Cloud Storage JSON API.', | |
1331 status=e.status_code) | |
1332 else: | |
1333 return AccessDeniedException(message or e.message, | |
1334 status=e.status_code) | |
1335 elif e.status_code == 404: | |
1336 if bucket_name: | |
1337 if object_name: | |
1338 return CreateObjectNotFoundException(e.status_code, self.provider, | |
1339 bucket_name, object_name, | |
1340 generation=generation) | |
1341 return CreateBucketNotFoundException(e.status_code, self.provider, | |
1342 bucket_name) | |
1343 return NotFoundException(e.message, status=e.status_code) | |
1344 elif e.status_code == 409 and bucket_name: | |
1345 if 'The bucket you tried to delete was not empty.' in str(e): | |
1346 return NotEmptyException('BucketNotEmpty (%s)' % bucket_name, | |
1347 status=e.status_code) | |
1348 return ServiceException( | |
1349 'Bucket %s already exists.' % bucket_name, status=e.status_code) | |
1350 elif e.status_code == 412: | |
1351 return PreconditionException(message, status=e.status_code) | |
1352 elif (e.status_code == 503 and | |
1353 not self.http.disable_ssl_certificate_validation): | |
1354 return ServiceException(_VALIDATE_CERTIFICATES_503_MESSAGE, | |
1355 status=e.status_code) | |
1356 return ServiceException(message, status=e.status_code) | |
1357 elif isinstance(e, apitools_exceptions.TransferInvalidError): | |
1358 return ServiceException('Transfer invalid (possible encoding error: %s)' | |
1359 % str(e)) | |
OLD | NEW |