| OLD | NEW |
| 1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
| 2 # Copyright 2011 Google Inc. All Rights Reserved. | 2 # Copyright 2011 Google Inc. All Rights Reserved. |
| 3 # Copyright 2011, Nexenta Systems Inc. | 3 # Copyright 2011, Nexenta Systems Inc. |
| 4 # | 4 # |
| 5 # Licensed under the Apache License, Version 2.0 (the "License"); | 5 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 # you may not use this file except in compliance with the License. | 6 # you may not use this file except in compliance with the License. |
| 7 # You may obtain a copy of the License at | 7 # You may obtain a copy of the License at |
| 8 # | 8 # |
| 9 # http://www.apache.org/licenses/LICENSE-2.0 | 9 # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 # | 10 # |
| 11 # Unless required by applicable law or agreed to in writing, software | 11 # Unless required by applicable law or agreed to in writing, software |
| 12 # distributed under the License is distributed on an "AS IS" BASIS, | 12 # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 # See the License for the specific language governing permissions and | 14 # See the License for the specific language governing permissions and |
| 15 # limitations under the License. | 15 # limitations under the License. |
| 16 """Helper functions for copy functionality.""" | 16 """Helper functions for copy functionality.""" |
| 17 | 17 |
| 18 from __future__ import absolute_import | 18 from __future__ import absolute_import |
| 19 | 19 |
| 20 import base64 | 20 import base64 |
| 21 from collections import namedtuple | 21 from collections import namedtuple |
| 22 import csv | 22 import csv |
| 23 import datetime | 23 import datetime |
| 24 import errno | 24 import errno |
| 25 import gzip | 25 import gzip |
| 26 from hashlib import md5 | 26 from hashlib import md5 |
| 27 import json | 27 import json |
| 28 import logging | 28 import logging |
| 29 import mimetypes | 29 import mimetypes |
| 30 import multiprocessing | 30 from operator import attrgetter |
| 31 import os | 31 import os |
| 32 import pickle | 32 import pickle |
| 33 import random | 33 import random |
| 34 import re | 34 import re |
| 35 import shutil | 35 import shutil |
| 36 import stat | 36 import stat |
| 37 import subprocess | 37 import subprocess |
| 38 import tempfile | 38 import tempfile |
| 39 import textwrap | 39 import textwrap |
| 40 import time | 40 import time |
| 41 import traceback | 41 import traceback |
| 42 | 42 |
| 43 from boto import config | 43 from boto import config |
| 44 import crcmod | 44 import crcmod |
| 45 | 45 |
| 46 import gslib | 46 import gslib |
| 47 from gslib.cloud_api import ArgumentException | 47 from gslib.cloud_api import ArgumentException |
| 48 from gslib.cloud_api import CloudApi | 48 from gslib.cloud_api import CloudApi |
| 49 from gslib.cloud_api import NotFoundException | 49 from gslib.cloud_api import NotFoundException |
| 50 from gslib.cloud_api import PreconditionException | 50 from gslib.cloud_api import PreconditionException |
| 51 from gslib.cloud_api import Preconditions | 51 from gslib.cloud_api import Preconditions |
| 52 from gslib.cloud_api import ResumableDownloadException | 52 from gslib.cloud_api import ResumableDownloadException |
| 53 from gslib.cloud_api import ResumableUploadAbortException | 53 from gslib.cloud_api import ResumableUploadAbortException |
| 54 from gslib.cloud_api import ResumableUploadException | 54 from gslib.cloud_api import ResumableUploadException |
| 55 from gslib.cloud_api import ResumableUploadStartOverException | 55 from gslib.cloud_api import ResumableUploadStartOverException |
| 56 from gslib.cloud_api_helper import GetDownloadSerializationDict | 56 from gslib.cloud_api_helper import GetDownloadSerializationData |
| 57 from gslib.commands.compose import MAX_COMPOSE_ARITY | 57 from gslib.commands.compose import MAX_COMPOSE_ARITY |
| 58 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SI
ZE | 58 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SI
ZE |
| 59 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD | 59 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD |
| 60 from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE |
| 61 from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS |
| 62 from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD |
| 60 from gslib.cs_api_map import ApiSelector | 63 from gslib.cs_api_map import ApiSelector |
| 61 from gslib.daisy_chain_wrapper import DaisyChainWrapper | 64 from gslib.daisy_chain_wrapper import DaisyChainWrapper |
| 62 from gslib.exception import CommandException | 65 from gslib.exception import CommandException |
| 63 from gslib.exception import HashMismatchException | 66 from gslib.exception import HashMismatchException |
| 64 from gslib.file_part import FilePart | 67 from gslib.file_part import FilePart |
| 65 from gslib.hashing_helper import Base64EncodeHash | 68 from gslib.hashing_helper import Base64EncodeHash |
| 66 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents | 69 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents |
| 67 from gslib.hashing_helper import CalculateHashesFromContents | 70 from gslib.hashing_helper import CalculateHashesFromContents |
| 71 from gslib.hashing_helper import CHECK_HASH_IF_FAST_ELSE_FAIL |
| 72 from gslib.hashing_helper import CHECK_HASH_NEVER |
| 73 from gslib.hashing_helper import ConcatCrc32c |
| 68 from gslib.hashing_helper import GetDownloadHashAlgs | 74 from gslib.hashing_helper import GetDownloadHashAlgs |
| 69 from gslib.hashing_helper import GetUploadHashAlgs | 75 from gslib.hashing_helper import GetUploadHashAlgs |
| 70 from gslib.hashing_helper import HashingFileUploadWrapper | 76 from gslib.hashing_helper import HashingFileUploadWrapper |
| 71 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict | 77 from gslib.parallelism_framework_util import AtomicDict |
| 72 from gslib.parallelism_framework_util import ThreadSafeDict | |
| 73 from gslib.progress_callback import ConstructAnnounceText | 78 from gslib.progress_callback import ConstructAnnounceText |
| 74 from gslib.progress_callback import FileProgressCallbackHandler | 79 from gslib.progress_callback import FileProgressCallbackHandler |
| 75 from gslib.progress_callback import ProgressCallbackWithBackoff | 80 from gslib.progress_callback import ProgressCallbackWithBackoff |
| 76 from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper | 81 from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper |
| 77 from gslib.storage_url import ContainsWildcard | 82 from gslib.storage_url import ContainsWildcard |
| 78 from gslib.storage_url import StorageUrlFromString | 83 from gslib.storage_url import StorageUrlFromString |
| 79 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages | 84 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages |
| 85 from gslib.tracker_file import DeleteDownloadTrackerFiles |
| 80 from gslib.tracker_file import DeleteTrackerFile | 86 from gslib.tracker_file import DeleteTrackerFile |
| 81 from gslib.tracker_file import GetTrackerFilePath | 87 from gslib.tracker_file import GetTrackerFilePath |
| 82 from gslib.tracker_file import RaiseUnwritableTrackerFileException | 88 from gslib.tracker_file import RaiseUnwritableTrackerFileException |
| 83 from gslib.tracker_file import ReadOrCreateDownloadTrackerFile | 89 from gslib.tracker_file import ReadOrCreateDownloadTrackerFile |
| 84 from gslib.tracker_file import TrackerFileType | 90 from gslib.tracker_file import TrackerFileType |
| 91 from gslib.tracker_file import WriteDownloadComponentTrackerFile |
| 85 from gslib.translation_helper import AddS3MarkerAclToObjectMetadata | 92 from gslib.translation_helper import AddS3MarkerAclToObjectMetadata |
| 86 from gslib.translation_helper import CopyObjectMetadata | 93 from gslib.translation_helper import CopyObjectMetadata |
| 87 from gslib.translation_helper import DEFAULT_CONTENT_TYPE | 94 from gslib.translation_helper import DEFAULT_CONTENT_TYPE |
| 88 from gslib.translation_helper import GenerationFromUrlAndString | 95 from gslib.translation_helper import GenerationFromUrlAndString |
| 89 from gslib.translation_helper import ObjectMetadataFromHeaders | 96 from gslib.translation_helper import ObjectMetadataFromHeaders |
| 90 from gslib.translation_helper import PreconditionsFromHeaders | 97 from gslib.translation_helper import PreconditionsFromHeaders |
| 91 from gslib.translation_helper import S3MarkerAclFromObjectMetadata | 98 from gslib.translation_helper import S3MarkerAclFromObjectMetadata |
| 99 from gslib.util import CheckFreeSpace |
| 100 from gslib.util import CheckMultiprocessingAvailableAndInit |
| 92 from gslib.util import CreateLock | 101 from gslib.util import CreateLock |
| 93 from gslib.util import DEFAULT_FILE_BUFFER_SIZE | 102 from gslib.util import DEFAULT_FILE_BUFFER_SIZE |
| 103 from gslib.util import DivideAndCeil |
| 94 from gslib.util import GetCloudApiInstance | 104 from gslib.util import GetCloudApiInstance |
| 95 from gslib.util import GetFileSize | 105 from gslib.util import GetFileSize |
| 96 from gslib.util import GetJsonResumableChunkSize | 106 from gslib.util import GetJsonResumableChunkSize |
| 97 from gslib.util import GetMaxRetryDelay | 107 from gslib.util import GetMaxRetryDelay |
| 98 from gslib.util import GetNumRetries | 108 from gslib.util import GetNumRetries |
| 99 from gslib.util import GetStreamFromFileUrl | 109 from gslib.util import GetStreamFromFileUrl |
| 100 from gslib.util import HumanReadableToBytes | 110 from gslib.util import HumanReadableToBytes |
| 101 from gslib.util import IS_WINDOWS | 111 from gslib.util import IS_WINDOWS |
| 102 from gslib.util import IsCloudSubdirPlaceholder | 112 from gslib.util import IsCloudSubdirPlaceholder |
| 103 from gslib.util import MakeHumanReadable | 113 from gslib.util import MakeHumanReadable |
| 104 from gslib.util import MIN_SIZE_COMPUTE_LOGGING | 114 from gslib.util import MIN_SIZE_COMPUTE_LOGGING |
| 105 from gslib.util import MultiprocessingIsAvailable | |
| 106 from gslib.util import ResumableThreshold | 115 from gslib.util import ResumableThreshold |
| 107 from gslib.util import TEN_MIB | 116 from gslib.util import TEN_MIB |
| 117 from gslib.util import UsingCrcmodExtension |
| 108 from gslib.util import UTF8 | 118 from gslib.util import UTF8 |
| 109 from gslib.wildcard_iterator import CreateWildcardIterator | 119 from gslib.wildcard_iterator import CreateWildcardIterator |
| 110 | 120 |
| 111 # pylint: disable=g-import-not-at-top | 121 # pylint: disable=g-import-not-at-top |
| 112 if IS_WINDOWS: | 122 if IS_WINDOWS: |
| 113 import msvcrt | 123 import msvcrt |
| 114 from ctypes import c_int | |
| 115 from ctypes import c_uint64 | |
| 116 from ctypes import c_char_p | |
| 117 from ctypes import c_wchar_p | |
| 118 from ctypes import windll | |
| 119 from ctypes import POINTER | |
| 120 from ctypes import WINFUNCTYPE | |
| 121 from ctypes import WinError | |
| 122 | 124 |
| 123 # Declare copy_helper_opts as a global because namedtuple isn't aware of | 125 # Declare copy_helper_opts as a global because namedtuple isn't aware of |
| 124 # assigning to a class member (which breaks pickling done by multiprocessing). | 126 # assigning to a class member (which breaks pickling done by multiprocessing). |
| 125 # For details see | 127 # For details see |
| 126 # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instanc
e-correctly | 128 # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instanc
e-correctly |
| 127 # Similarly can't pickle logger. | |
| 128 # pylint: disable=global-at-module-level | 129 # pylint: disable=global-at-module-level |
| 129 global global_copy_helper_opts, global_logger | 130 global global_copy_helper_opts |
| 130 | 131 |
| 131 # In-memory map of local files that are currently opened for write. Used to | 132 # In-memory map of local files that are currently opened for write. Used to |
| 132 # ensure that if we write to the same file twice (say, for example, because the | 133 # ensure that if we write to the same file twice (say, for example, because the |
| 133 # user specified two identical source URLs), the writes occur serially. | 134 # user specified two identical source URLs), the writes occur serially. |
| 134 global open_files_map | 135 global open_files_map, open_files_lock |
| 135 open_files_map = ( | 136 open_files_map = ( |
| 136 ThreadSafeDict() if (IS_WINDOWS or not MultiprocessingIsAvailable()[0]) | 137 AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available |
| 137 else ThreadAndProcessSafeDict(multiprocessing.Manager())) | 138 else AtomicDict(manager=gslib.util.manager)) |
| 139 |
| 140 # We don't allow multiple processes on Windows, so using a process-safe lock |
| 141 # would be unnecessary. |
| 142 open_files_lock = CreateLock() |
| 138 | 143 |
| 139 # For debugging purposes; if True, files and objects that fail hash validation | 144 # For debugging purposes; if True, files and objects that fail hash validation |
| 140 # will be saved with the below suffix appended. | 145 # will be saved with the below suffix appended. |
| 141 _RENAME_ON_HASH_MISMATCH = False | 146 _RENAME_ON_HASH_MISMATCH = False |
| 142 _RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt' | 147 _RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt' |
| 143 | 148 |
| 144 PARALLEL_UPLOAD_TEMP_NAMESPACE = ( | 149 PARALLEL_UPLOAD_TEMP_NAMESPACE = ( |
| 145 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/') | 150 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/') |
| 146 | 151 |
| 147 PARALLEL_UPLOAD_STATIC_SALT = u""" | 152 PARALLEL_UPLOAD_STATIC_SALT = u""" |
| (...skipping 23 matching lines...) Expand all Loading... |
| 171 # canned_acl: canned_acl to apply to the uploaded file/component. | 176 # canned_acl: canned_acl to apply to the uploaded file/component. |
| 172 # content_type: content-type for final object, used for setting content-type | 177 # content_type: content-type for final object, used for setting content-type |
| 173 # of components and final object. | 178 # of components and final object. |
| 174 # tracker_file: tracker file for this component. | 179 # tracker_file: tracker file for this component. |
| 175 # tracker_file_lock: tracker file lock for tracker file(s). | 180 # tracker_file_lock: tracker file lock for tracker file(s). |
| 176 PerformParallelUploadFileToObjectArgs = namedtuple( | 181 PerformParallelUploadFileToObjectArgs = namedtuple( |
| 177 'PerformParallelUploadFileToObjectArgs', | 182 'PerformParallelUploadFileToObjectArgs', |
| 178 'filename file_start file_length src_url dst_url canned_acl ' | 183 'filename file_start file_length src_url dst_url canned_acl ' |
| 179 'content_type tracker_file tracker_file_lock') | 184 'content_type tracker_file tracker_file_lock') |
| 180 | 185 |
| 186 PerformSlicedDownloadObjectToFileArgs = namedtuple( |
| 187 'PerformSlicedDownloadObjectToFileArgs', |
| 188 'component_num src_url src_obj_metadata dst_url download_file_name ' |
| 189 'start_byte end_byte') |
| 190 |
| 191 PerformSlicedDownloadReturnValues = namedtuple( |
| 192 'PerformSlicedDownloadReturnValues', |
| 193 'component_num crc32c bytes_transferred server_encoding') |
| 194 |
| 181 ObjectFromTracker = namedtuple('ObjectFromTracker', | 195 ObjectFromTracker = namedtuple('ObjectFromTracker', |
| 182 'object_name generation') | 196 'object_name generation') |
| 183 | 197 |
| 184 # TODO: Refactor this file to be less cumbersome. In particular, some of the | 198 # TODO: Refactor this file to be less cumbersome. In particular, some of the |
| 185 # different paths (e.g., uploading a file to an object vs. downloading an | 199 # different paths (e.g., uploading a file to an object vs. downloading an |
| 186 # object to a file) could be split into separate files. | 200 # object to a file) could be split into separate files. |
| 187 | 201 |
| 188 # Chunk size to use while zipping/unzipping gzip files. | 202 # Chunk size to use while zipping/unzipping gzip files. |
| 189 GZIP_CHUNK_SIZE = 8192 | 203 GZIP_CHUNK_SIZE = 8192 |
| 190 | 204 |
| 205 # Number of bytes to wait before updating a sliced download component tracker |
| 206 # file. |
| 207 TRACKERFILE_UPDATE_THRESHOLD = TEN_MIB |
| 208 |
| 191 PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024 | 209 PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024 |
| 192 | 210 |
| 193 # S3 requires special Multipart upload logic (that we currently don't implement) | 211 # S3 requires special Multipart upload logic (that we currently don't implement) |
| 194 # for files > 5GiB in size. | 212 # for files > 5GiB in size. |
| 195 S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024 | 213 S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024 |
| 196 | 214 |
| 197 suggested_parallel_composites = False | 215 # TODO: Create a multiprocessing framework value allocator, then use it instead |
| 216 # of a dict. |
| 217 global suggested_sliced_transfers, suggested_sliced_transfers_lock |
| 218 suggested_sliced_transfers = ( |
| 219 AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available |
| 220 else AtomicDict(manager=gslib.util.manager)) |
| 221 suggested_sliced_transfers_lock = CreateLock() |
| 198 | 222 |
| 199 | 223 |
| 200 class FileConcurrencySkipError(Exception): | 224 class FileConcurrencySkipError(Exception): |
| 201 """Raised when skipping a file due to a concurrent, duplicate copy.""" | 225 """Raised when skipping a file due to a concurrent, duplicate copy.""" |
| 202 | 226 |
| 203 | 227 |
| 204 def _RmExceptionHandler(cls, e): | 228 def _RmExceptionHandler(cls, e): |
| 205 """Simple exception handler to allow post-completion status.""" | 229 """Simple exception handler to allow post-completion status.""" |
| 206 cls.logger.error(str(e)) | 230 cls.logger.error(str(e)) |
| 207 | 231 |
| 208 | 232 |
| 209 def _ParallelUploadCopyExceptionHandler(cls, e): | 233 def _ParallelCopyExceptionHandler(cls, e): |
| 210 """Simple exception handler to allow post-completion status.""" | 234 """Simple exception handler to allow post-completion status.""" |
| 211 cls.logger.error(str(e)) | 235 cls.logger.error(str(e)) |
| 212 cls.op_failure_count += 1 | 236 cls.op_failure_count += 1 |
| 213 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n', | 237 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n', |
| 214 traceback.format_exc()) | 238 traceback.format_exc()) |
| 215 | 239 |
| 216 | 240 |
| 217 def _PerformParallelUploadFileToObject(cls, args, thread_state=None): | 241 def _PerformParallelUploadFileToObject(cls, args, thread_state=None): |
| 218 """Function argument to Apply for performing parallel composite uploads. | 242 """Function argument to Apply for performing parallel composite uploads. |
| 219 | 243 |
| (...skipping 21 matching lines...) Expand all Loading... |
| 241 | 265 |
| 242 try: | 266 try: |
| 243 if global_copy_helper_opts.canned_acl: | 267 if global_copy_helper_opts.canned_acl: |
| 244 # No canned ACL support in JSON, force XML API to be used for | 268 # No canned ACL support in JSON, force XML API to be used for |
| 245 # upload/copy operations. | 269 # upload/copy operations. |
| 246 orig_prefer_api = gsutil_api.prefer_api | 270 orig_prefer_api = gsutil_api.prefer_api |
| 247 gsutil_api.prefer_api = ApiSelector.XML | 271 gsutil_api.prefer_api = ApiSelector.XML |
| 248 ret = _UploadFileToObject(args.src_url, fp, args.file_length, | 272 ret = _UploadFileToObject(args.src_url, fp, args.file_length, |
| 249 args.dst_url, dst_object_metadata, | 273 args.dst_url, dst_object_metadata, |
| 250 preconditions, gsutil_api, cls.logger, cls, | 274 preconditions, gsutil_api, cls.logger, cls, |
| 251 _ParallelUploadCopyExceptionHandler, | 275 _ParallelCopyExceptionHandler, |
| 252 gzip_exts=None, allow_splitting=False) | 276 gzip_exts=None, allow_splitting=False) |
| 253 finally: | 277 finally: |
| 254 if global_copy_helper_opts.canned_acl: | 278 if global_copy_helper_opts.canned_acl: |
| 255 gsutil_api.prefer_api = orig_prefer_api | 279 gsutil_api.prefer_api = orig_prefer_api |
| 256 | 280 |
| 257 component = ret[2] | 281 component = ret[2] |
| 258 _AppendComponentTrackerToParallelUploadTrackerFile( | 282 _AppendComponentTrackerToParallelUploadTrackerFile( |
| 259 args.tracker_file, component, args.tracker_file_lock) | 283 args.tracker_file, component, args.tracker_file_lock) |
| 260 return ret | 284 return ret |
| 261 | 285 |
| (...skipping 372 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 634 | 658 |
| 635 def _CreateDigestsFromDigesters(digesters): | 659 def _CreateDigestsFromDigesters(digesters): |
| 636 digests = {} | 660 digests = {} |
| 637 if digesters: | 661 if digesters: |
| 638 for alg in digesters: | 662 for alg in digesters: |
| 639 digests[alg] = base64.encodestring( | 663 digests[alg] = base64.encodestring( |
| 640 digesters[alg].digest()).rstrip('\n') | 664 digesters[alg].digest()).rstrip('\n') |
| 641 return digests | 665 return digests |
| 642 | 666 |
| 643 | 667 |
| 644 def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata): | 668 def _CreateDigestsFromLocalFile(logger, algs, file_name, final_file_name, |
| 669 src_obj_metadata): |
| 645 """Creates a base64 CRC32C and/or MD5 digest from file_name. | 670 """Creates a base64 CRC32C and/or MD5 digest from file_name. |
| 646 | 671 |
| 647 Args: | 672 Args: |
| 648 logger: for outputting log messages. | 673 logger: For outputting log messages. |
| 649 algs: list of algorithms to compute. | 674 algs: List of algorithms to compute. |
| 650 file_name: file to digest. | 675 file_name: File to digest. |
| 651 src_obj_metadata: metadta of source object. | 676 final_file_name: Permanent location to be used for the downloaded file |
| 677 after validation (used for logging). |
| 678 src_obj_metadata: Metadata of source object. |
| 652 | 679 |
| 653 Returns: | 680 Returns: |
| 654 Dict of algorithm name : base 64 encoded digest | 681 Dict of algorithm name : base 64 encoded digest |
| 655 """ | 682 """ |
| 656 hash_dict = {} | 683 hash_dict = {} |
| 657 if 'md5' in algs: | 684 if 'md5' in algs: |
| 658 if src_obj_metadata.size and src_obj_metadata.size > TEN_MIB: | |
| 659 logger.info( | |
| 660 'Computing MD5 for %s...', file_name) | |
| 661 hash_dict['md5'] = md5() | 685 hash_dict['md5'] = md5() |
| 662 if 'crc32c' in algs: | 686 if 'crc32c' in algs: |
| 663 hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c') | 687 hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c') |
| 664 with open(file_name, 'rb') as fp: | 688 with open(file_name, 'rb') as fp: |
| 665 CalculateHashesFromContents( | 689 CalculateHashesFromContents( |
| 666 fp, hash_dict, ProgressCallbackWithBackoff( | 690 fp, hash_dict, ProgressCallbackWithBackoff( |
| 667 src_obj_metadata.size, | 691 src_obj_metadata.size, |
| 668 FileProgressCallbackHandler( | 692 FileProgressCallbackHandler( |
| 669 ConstructAnnounceText('Hashing', file_name), logger).call)) | 693 ConstructAnnounceText('Hashing', final_file_name), |
| 694 logger).call)) |
| 670 digests = {} | 695 digests = {} |
| 671 for alg_name, digest in hash_dict.iteritems(): | 696 for alg_name, digest in hash_dict.iteritems(): |
| 672 digests[alg_name] = Base64EncodeHash(digest.hexdigest()) | 697 digests[alg_name] = Base64EncodeHash(digest.hexdigest()) |
| 673 return digests | 698 return digests |
| 674 | 699 |
| 675 | 700 |
| 676 def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, | 701 def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, |
| 677 dst_obj_metadata): | 702 dst_obj_metadata): |
| 678 """Validates integrity of two cloud objects copied via daisy-chain. | 703 """Validates integrity of two cloud objects copied via daisy-chain. |
| 679 | 704 |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 723 | 748 |
| 724 | 749 |
| 725 def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests, | 750 def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests, |
| 726 is_upload=False): | 751 is_upload=False): |
| 727 """Validates integrity by comparing cloud digest to local digest. | 752 """Validates integrity by comparing cloud digest to local digest. |
| 728 | 753 |
| 729 Args: | 754 Args: |
| 730 logger: for outputting log messages. | 755 logger: for outputting log messages. |
| 731 obj_url: CloudUrl for cloud object. | 756 obj_url: CloudUrl for cloud object. |
| 732 obj_metadata: Cloud Object being downloaded from or uploaded to. | 757 obj_metadata: Cloud Object being downloaded from or uploaded to. |
| 733 file_name: Local file name on disk being downloaded to or uploaded from. | 758 file_name: Local file name on disk being downloaded to or uploaded from |
| 759 (used only for logging). |
| 734 digests: Computed Digests for the object. | 760 digests: Computed Digests for the object. |
| 735 is_upload: If true, comparing for an uploaded object (controls logging). | 761 is_upload: If true, comparing for an uploaded object (controls logging). |
| 736 | 762 |
| 737 Raises: | 763 Raises: |
| 738 CommandException: if cloud digests don't match local digests. | 764 CommandException: if cloud digests don't match local digests. |
| 739 """ | 765 """ |
| 740 local_hashes = digests | 766 local_hashes = digests |
| 741 cloud_hashes = {} | 767 cloud_hashes = {} |
| 742 if obj_metadata.md5Hash: | 768 if obj_metadata.md5Hash: |
| 743 cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n') | 769 cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n') |
| (...skipping 234 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 978 request_components, dst_obj_metadata, preconditions=preconditions, | 1004 request_components, dst_obj_metadata, preconditions=preconditions, |
| 979 provider=dst_url.scheme, fields=['generation', 'crc32c', 'size']) | 1005 provider=dst_url.scheme, fields=['generation', 'crc32c', 'size']) |
| 980 | 1006 |
| 981 try: | 1007 try: |
| 982 # Make sure only to delete things that we know were successfully | 1008 # Make sure only to delete things that we know were successfully |
| 983 # uploaded (as opposed to all of the objects that we attempted to | 1009 # uploaded (as opposed to all of the objects that we attempted to |
| 984 # create) so that we don't delete any preexisting objects, except for | 1010 # create) so that we don't delete any preexisting objects, except for |
| 985 # those that were uploaded by a previous, failed run and have since | 1011 # those that were uploaded by a previous, failed run and have since |
| 986 # changed (but still have an old generation lying around). | 1012 # changed (but still have an old generation lying around). |
| 987 objects_to_delete = components + existing_objects_to_delete | 1013 objects_to_delete = components + existing_objects_to_delete |
| 988 command_obj.Apply(_DeleteObjectFn, objects_to_delete, _RmExceptionHandler, | 1014 command_obj.Apply( |
| 989 arg_checker=gslib.command.DummyArgChecker, | 1015 _DeleteTempComponentObjectFn, objects_to_delete, _RmExceptionHandler, |
| 990 parallel_operations_override=True) | 1016 arg_checker=gslib.command.DummyArgChecker, |
| 1017 parallel_operations_override=True) |
| 991 except Exception: # pylint: disable=broad-except | 1018 except Exception: # pylint: disable=broad-except |
| 992 # If some of the delete calls fail, don't cause the whole command to | 1019 # If some of the delete calls fail, don't cause the whole command to |
| 993 # fail. The copy was successful iff the compose call succeeded, so | 1020 # fail. The copy was successful iff the compose call succeeded, so |
| 994 # reduce this to a warning. | 1021 # reduce this to a warning. |
| 995 logging.warning( | 1022 logging.warning( |
| 996 'Failed to delete some of the following temporary objects:\n' + | 1023 'Failed to delete some of the following temporary objects:\n' + |
| 997 '\n'.join(dst_args.keys())) | 1024 '\n'.join(dst_args.keys())) |
| 998 finally: | 1025 finally: |
| 999 with tracker_file_lock: | 1026 with tracker_file_lock: |
| 1000 if os.path.exists(tracker_file): | 1027 if os.path.exists(tracker_file): |
| (...skipping 17 matching lines...) Expand all Loading... |
| 1018 logger: for outputting log messages. | 1045 logger: for outputting log messages. |
| 1019 allow_splitting: If false, then this function returns false. | 1046 allow_splitting: If false, then this function returns false. |
| 1020 src_url: FileUrl corresponding to a local file. | 1047 src_url: FileUrl corresponding to a local file. |
| 1021 dst_url: CloudUrl corresponding to destination cloud object. | 1048 dst_url: CloudUrl corresponding to destination cloud object. |
| 1022 file_size: The size of the source file, in bytes. | 1049 file_size: The size of the source file, in bytes. |
| 1023 canned_acl: Canned ACL to apply to destination object, if any. | 1050 canned_acl: Canned ACL to apply to destination object, if any. |
| 1024 | 1051 |
| 1025 Returns: | 1052 Returns: |
| 1026 True iff a parallel upload should be performed on the source file. | 1053 True iff a parallel upload should be performed on the source file. |
| 1027 """ | 1054 """ |
| 1028 global suggested_parallel_composites | 1055 global suggested_slice_transfers, suggested_sliced_transfers_lock |
| 1029 parallel_composite_upload_threshold = HumanReadableToBytes(config.get( | 1056 parallel_composite_upload_threshold = HumanReadableToBytes(config.get( |
| 1030 'GSUtil', 'parallel_composite_upload_threshold', | 1057 'GSUtil', 'parallel_composite_upload_threshold', |
| 1031 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD)) | 1058 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD)) |
| 1032 | 1059 |
| 1033 all_factors_but_size = ( | 1060 all_factors_but_size = ( |
| 1034 allow_splitting # Don't split the pieces multiple times. | 1061 allow_splitting # Don't split the pieces multiple times. |
| 1035 and not src_url.IsStream() # We can't partition streams. | 1062 and not src_url.IsStream() # We can't partition streams. |
| 1036 and dst_url.scheme == 'gs' # Compose is only for gs. | 1063 and dst_url.scheme == 'gs' # Compose is only for gs. |
| 1037 and not canned_acl) # TODO: Implement canned ACL support for compose. | 1064 and not canned_acl) # TODO: Implement canned ACL support for compose. |
| 1038 | 1065 |
| 1039 # Since parallel composite uploads are disabled by default, make user aware of | 1066 # Since parallel composite uploads are disabled by default, make user aware of |
| 1040 # them. | 1067 # them. |
| 1041 # TODO: Once compiled crcmod is being distributed by major Linux distributions | 1068 # TODO: Once compiled crcmod is being distributed by major Linux distributions |
| 1042 # remove this check. | 1069 # remove this check. |
| 1043 if (all_factors_but_size and parallel_composite_upload_threshold == 0 | 1070 if (all_factors_but_size and parallel_composite_upload_threshold == 0 |
| 1044 and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD | 1071 and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD): |
| 1045 and not suggested_parallel_composites): | 1072 with suggested_sliced_transfers_lock: |
| 1046 logger.info('\n'.join(textwrap.wrap( | 1073 if not suggested_sliced_transfers.get('suggested'): |
| 1047 '==> NOTE: You are uploading one or more large file(s), which would ' | 1074 logger.info('\n'.join(textwrap.wrap( |
| 1048 'run significantly faster if you enable parallel composite uploads. ' | 1075 '==> NOTE: You are uploading one or more large file(s), which ' |
| 1049 'This feature can be enabled by editing the ' | 1076 'would run significantly faster if you enable parallel composite ' |
| 1050 '"parallel_composite_upload_threshold" value in your .boto ' | 1077 'uploads. This feature can be enabled by editing the ' |
| 1051 'configuration file. However, note that if you do this you and any ' | 1078 '"parallel_composite_upload_threshold" value in your .boto ' |
| 1052 'users that download such composite files will need to have a compiled ' | 1079 'configuration file. However, note that if you do this you and any ' |
| 1053 'crcmod installed (see "gsutil help crcmod").')) + '\n') | 1080 'users that download such composite files will need to have a ' |
| 1054 suggested_parallel_composites = True | 1081 'compiled crcmod installed (see "gsutil help crcmod").')) + '\n') |
| 1082 suggested_sliced_transfers['suggested'] = True |
| 1055 | 1083 |
| 1056 return (all_factors_but_size | 1084 return (all_factors_but_size |
| 1057 and parallel_composite_upload_threshold > 0 | 1085 and parallel_composite_upload_threshold > 0 |
| 1058 and file_size >= parallel_composite_upload_threshold) | 1086 and file_size >= parallel_composite_upload_threshold) |
| 1059 | 1087 |
| 1060 | 1088 |
| 1061 def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id, | 1089 def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id, |
| 1062 treat_nonexistent_object_as_subdir=False): | 1090 treat_nonexistent_object_as_subdir=False): |
| 1063 """Expands wildcard if present in url_str. | 1091 """Expands wildcard if present in url_str. |
| 1064 | 1092 |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1097 storage_url = StorageUrlFromString(url_str) | 1125 storage_url = StorageUrlFromString(url_str) |
| 1098 | 1126 |
| 1099 # Handle non-wildcarded URL. | 1127 # Handle non-wildcarded URL. |
| 1100 if storage_url.IsFileUrl(): | 1128 if storage_url.IsFileUrl(): |
| 1101 return (storage_url, storage_url.IsDirectory()) | 1129 return (storage_url, storage_url.IsDirectory()) |
| 1102 | 1130 |
| 1103 # At this point we have a cloud URL. | 1131 # At this point we have a cloud URL. |
| 1104 if storage_url.IsBucket(): | 1132 if storage_url.IsBucket(): |
| 1105 return (storage_url, True) | 1133 return (storage_url, True) |
| 1106 | 1134 |
| 1107 # For object/prefix URLs check 3 cases: (a) if the name ends with '/' treat | 1135 # For object/prefix URLs, there are four cases that indicate the destination |
| 1108 # as a subdir; otherwise, use the wildcard iterator with url to | 1136 # is a cloud subdirectory; these are always considered to be an existing |
| 1109 # find if (b) there's a Prefix matching url, or (c) name is of form | 1137 # container. Checking each case allows gsutil to provide Unix-like |
| 1110 # dir_$folder$ (and in both these cases also treat dir as a subdir). | 1138 # destination folder semantics, but requires up to three HTTP calls, noted |
| 1111 # Cloud subdirs are always considered to be an existing container. | 1139 # below. |
| 1140 |
| 1141 # Case 1: If a placeholder object ending with '/' exists. |
| 1112 if IsCloudSubdirPlaceholder(storage_url): | 1142 if IsCloudSubdirPlaceholder(storage_url): |
| 1113 return (storage_url, True) | 1143 return (storage_url, True) |
| 1114 | 1144 |
| 1115 # Check for the special case where we have a folder marker object. | 1145 # HTTP call to make an eventually consistent check for a matching prefix, |
| 1116 folder_expansion = CreateWildcardIterator( | 1146 # _$folder$, or empty listing. |
| 1117 storage_url.versionless_url_string + '_$folder$', gsutil_api, | 1147 expansion_empty = True |
| 1118 debug=debug, project_id=project_id).IterAll( | 1148 list_iterator = gsutil_api.ListObjects( |
| 1119 bucket_listing_fields=['name']) | 1149 storage_url.bucket_name, prefix=storage_url.object_name, delimiter='/', |
| 1120 for blr in folder_expansion: | 1150 provider=storage_url.scheme, fields=['prefixes', 'items/name']) |
| 1121 return (storage_url, True) | 1151 for obj_or_prefix in list_iterator: |
| 1152 # To conserve HTTP calls for the common case, we make a single listing |
| 1153 # that covers prefixes and object names. Listing object names covers the |
| 1154 # _$folder$ case and the nonexistent-object-as-subdir case. However, if |
| 1155 # there are many existing objects for which the target URL is an exact |
| 1156 # prefix, this listing could be paginated and span multiple HTTP calls. |
| 1157 # If this case becomes common, we could heurestically abort the |
| 1158 # listing operation after the first page of results and just query for the |
| 1159 # _$folder$ object directly using GetObjectMetadata. |
| 1160 expansion_empty = False |
| 1122 | 1161 |
| 1123 blr_expansion = CreateWildcardIterator(url_str, gsutil_api, | 1162 if obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.PREFIX: |
| 1124 debug=debug, | 1163 # Case 2: If there is a matching prefix when listing the destination URL. |
| 1125 project_id=project_id).IterAll( | 1164 return (storage_url, True) |
| 1126 bucket_listing_fields=['name']) | 1165 elif (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT and |
| 1127 expansion_empty = True | 1166 obj_or_prefix.data.name == storage_url.object_name + '_$folder$'): |
| 1128 for blr in blr_expansion: | 1167 # Case 3: If a placeholder object matching destination + _$folder$ |
| 1129 expansion_empty = False | 1168 # exists. |
| 1130 if blr.IsPrefix(): | |
| 1131 return (storage_url, True) | 1169 return (storage_url, True) |
| 1132 | 1170 |
| 1133 return (storage_url, | 1171 # Case 4: If no objects/prefixes matched, and nonexistent objects should be |
| 1134 expansion_empty and treat_nonexistent_object_as_subdir) | 1172 # treated as subdirectories. |
| 1173 return (storage_url, expansion_empty and treat_nonexistent_object_as_subdir) |
| 1135 | 1174 |
| 1136 | 1175 |
| 1137 def FixWindowsNaming(src_url, dst_url): | 1176 def FixWindowsNaming(src_url, dst_url): |
| 1138 """Translates Windows pathnames to cloud pathnames. | 1177 """Translates Windows pathnames to cloud pathnames. |
| 1139 | 1178 |
| 1140 Rewrites the destination URL built by ConstructDstUrl(). | 1179 Rewrites the destination URL built by ConstructDstUrl(). |
| 1141 | 1180 |
| 1142 Args: | 1181 Args: |
| 1143 src_url: Source StorageUrl to be copied. | 1182 src_url: Source StorageUrl to be copied. |
| 1144 dst_url: The destination StorageUrl built by ConstructDstUrl(). | 1183 dst_url: The destination StorageUrl built by ConstructDstUrl(). |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1235 end_time = time.time() | 1274 end_time = time.time() |
| 1236 | 1275 |
| 1237 result_url = dst_url.Clone() | 1276 result_url = dst_url.Clone() |
| 1238 result_url.generation = GenerationFromUrlAndString(result_url, | 1277 result_url.generation = GenerationFromUrlAndString(result_url, |
| 1239 dst_obj.generation) | 1278 dst_obj.generation) |
| 1240 | 1279 |
| 1241 return (end_time - start_time, src_obj_metadata.size, result_url, | 1280 return (end_time - start_time, src_obj_metadata.size, result_url, |
| 1242 dst_obj.md5Hash) | 1281 dst_obj.md5Hash) |
| 1243 | 1282 |
| 1244 | 1283 |
| 1245 def _CheckFreeSpace(path): | |
| 1246 """Return path/drive free space (in bytes).""" | |
| 1247 if IS_WINDOWS: | |
| 1248 # pylint: disable=g-import-not-at-top | |
| 1249 try: | |
| 1250 # pylint: disable=invalid-name | |
| 1251 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p, | |
| 1252 POINTER(c_uint64), | |
| 1253 POINTER(c_uint64), | |
| 1254 POINTER(c_uint64)) | |
| 1255 get_disk_free_space_ex = get_disk_free_space_ex( | |
| 1256 ('GetDiskFreeSpaceExW', windll.kernel32), ( | |
| 1257 (1, 'lpszPathName'), | |
| 1258 (2, 'lpFreeUserSpace'), | |
| 1259 (2, 'lpTotalSpace'), | |
| 1260 (2, 'lpFreeSpace'),)) | |
| 1261 except AttributeError: | |
| 1262 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p, | |
| 1263 POINTER(c_uint64), | |
| 1264 POINTER(c_uint64), | |
| 1265 POINTER(c_uint64)) | |
| 1266 get_disk_free_space_ex = get_disk_free_space_ex( | |
| 1267 ('GetDiskFreeSpaceExA', windll.kernel32), ( | |
| 1268 (1, 'lpszPathName'), | |
| 1269 (2, 'lpFreeUserSpace'), | |
| 1270 (2, 'lpTotalSpace'), | |
| 1271 (2, 'lpFreeSpace'),)) | |
| 1272 | |
| 1273 def GetDiskFreeSpaceExErrCheck(result, unused_func, args): | |
| 1274 if not result: | |
| 1275 raise WinError() | |
| 1276 return args[1].value | |
| 1277 get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck | |
| 1278 | |
| 1279 return get_disk_free_space_ex(os.getenv('SystemDrive')) | |
| 1280 else: | |
| 1281 (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path) | |
| 1282 return f_frsize * f_bavail | |
| 1283 | |
| 1284 | |
| 1285 def _SetContentTypeFromFile(src_url, dst_obj_metadata): | 1284 def _SetContentTypeFromFile(src_url, dst_obj_metadata): |
| 1286 """Detects and sets Content-Type if src_url names a local file. | 1285 """Detects and sets Content-Type if src_url names a local file. |
| 1287 | 1286 |
| 1288 Args: | 1287 Args: |
| 1289 src_url: Source StorageUrl. | 1288 src_url: Source StorageUrl. |
| 1290 dst_obj_metadata: Object-specific metadata that should be overidden during | 1289 dst_obj_metadata: Object-specific metadata that should be overidden during |
| 1291 the copy. | 1290 the copy. |
| 1292 """ | 1291 """ |
| 1293 # contentType == '' if user requested default type. | 1292 # contentType == '' if user requested default type. |
| 1294 if (dst_obj_metadata.contentType is None and src_url.IsFileUrl() | 1293 if (dst_obj_metadata.contentType is None and src_url.IsFileUrl() |
| (...skipping 207 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1502 # TODO: Compress using a streaming model as opposed to all at once here. | 1501 # TODO: Compress using a streaming model as opposed to all at once here. |
| 1503 if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING: | 1502 if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING: |
| 1504 logger.info( | 1503 logger.info( |
| 1505 'Compressing %s (to tmp)...', src_url) | 1504 'Compressing %s (to tmp)...', src_url) |
| 1506 (gzip_fh, gzip_path) = tempfile.mkstemp() | 1505 (gzip_fh, gzip_path) = tempfile.mkstemp() |
| 1507 gzip_fp = None | 1506 gzip_fp = None |
| 1508 try: | 1507 try: |
| 1509 # Check for temp space. Assume the compressed object is at most 2x | 1508 # Check for temp space. Assume the compressed object is at most 2x |
| 1510 # the size of the object (normally should compress to smaller than | 1509 # the size of the object (normally should compress to smaller than |
| 1511 # the object) | 1510 # the object) |
| 1512 if _CheckFreeSpace(gzip_path) < 2*int(src_obj_size): | 1511 if CheckFreeSpace(gzip_path) < 2*int(src_obj_size): |
| 1513 raise CommandException('Inadequate temp space available to compress ' | 1512 raise CommandException('Inadequate temp space available to compress ' |
| 1514 '%s. See the CHANGING TEMP DIRECTORIES section ' | 1513 '%s. See the CHANGING TEMP DIRECTORIES section ' |
| 1515 'of "gsutil help cp" for more info.' % src_url) | 1514 'of "gsutil help cp" for more info.' % src_url) |
| 1516 gzip_fp = gzip.open(gzip_path, 'wb') | 1515 gzip_fp = gzip.open(gzip_path, 'wb') |
| 1517 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) | 1516 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) |
| 1518 while data: | 1517 while data: |
| 1519 gzip_fp.write(data) | 1518 gzip_fp.write(data) |
| 1520 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) | 1519 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) |
| 1521 finally: | 1520 finally: |
| 1522 if gzip_fp: | 1521 if gzip_fp: |
| (...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1650 result_url = dst_url.Clone() | 1649 result_url = dst_url.Clone() |
| 1651 | 1650 |
| 1652 result_url.generation = uploaded_object.generation | 1651 result_url.generation = uploaded_object.generation |
| 1653 result_url.generation = GenerationFromUrlAndString( | 1652 result_url.generation = GenerationFromUrlAndString( |
| 1654 result_url, uploaded_object.generation) | 1653 result_url, uploaded_object.generation) |
| 1655 | 1654 |
| 1656 return (elapsed_time, uploaded_object.size, result_url, | 1655 return (elapsed_time, uploaded_object.size, result_url, |
| 1657 uploaded_object.md5Hash) | 1656 uploaded_object.md5Hash) |
| 1658 | 1657 |
| 1659 | 1658 |
| 1660 # TODO: Refactor this long function into smaller pieces. | 1659 def _GetDownloadFile(dst_url, src_obj_metadata, logger): |
| 1661 # pylint: disable=too-many-statements | 1660 """Creates a new download file, and deletes the file that will be replaced. |
| 1662 def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, | 1661 |
| 1663 gsutil_api, logger, test_method=None): | 1662 Names and creates a temporary file for this download. Also, if there is an |
| 1664 """Downloads an object to a local file. | 1663 existing file at the path where this file will be placed after the download |
| 1664 is completed, that file will be deleted. |
| 1665 | 1665 |
| 1666 Args: | 1666 Args: |
| 1667 src_url: Source CloudUrl. | 1667 dst_url: Destination FileUrl. |
| 1668 src_obj_metadata: Metadata from the source object. | 1668 src_obj_metadata: Metadata from the source object. |
| 1669 dst_url: Destination FileUrl. | |
| 1670 gsutil_api: gsutil Cloud API instance to use for the download. | |
| 1671 logger: for outputting log messages. | 1669 logger: for outputting log messages. |
| 1672 test_method: Optional test method for modifying the file before validation | 1670 |
| 1673 during unit tests. | |
| 1674 Returns: | 1671 Returns: |
| 1675 (elapsed_time, bytes_transferred, dst_url, md5), excluding overhead like | 1672 (download_file_name, need_to_unzip) |
| 1676 initial GET. | 1673 download_file_name: The name of the temporary file to which the object will |
| 1677 | 1674 be downloaded. |
| 1678 Raises: | 1675 need_to_unzip: If true, a temporary zip file was used and must be |
| 1679 CommandException: if errors encountered. | 1676 uncompressed as part of validation. |
| 1680 """ | 1677 """ |
| 1681 global open_files_map | 1678 dir_name = os.path.dirname(dst_url.object_name) |
| 1682 file_name = dst_url.object_name | |
| 1683 dir_name = os.path.dirname(file_name) | |
| 1684 if dir_name and not os.path.exists(dir_name): | 1679 if dir_name and not os.path.exists(dir_name): |
| 1685 # Do dir creation in try block so can ignore case where dir already | 1680 # Do dir creation in try block so can ignore case where dir already |
| 1686 # exists. This is needed to avoid a race condition when running gsutil | 1681 # exists. This is needed to avoid a race condition when running gsutil |
| 1687 # -m cp. | 1682 # -m cp. |
| 1688 try: | 1683 try: |
| 1689 os.makedirs(dir_name) | 1684 os.makedirs(dir_name) |
| 1690 except OSError, e: | 1685 except OSError, e: |
| 1691 if e.errno != errno.EEXIST: | 1686 if e.errno != errno.EEXIST: |
| 1692 raise | 1687 raise |
| 1693 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) | 1688 |
| 1689 need_to_unzip = False |
| 1694 # For gzipped objects download to a temp file and unzip. For the XML API, | 1690 # For gzipped objects download to a temp file and unzip. For the XML API, |
| 1695 # the represents the result of a HEAD request. For the JSON API, this is | 1691 # this represents the result of a HEAD request. For the JSON API, this is |
| 1696 # the stored encoding which the service may not respect. However, if the | 1692 # the stored encoding which the service may not respect. However, if the |
| 1697 # server sends decompressed bytes for a file that is stored compressed | 1693 # server sends decompressed bytes for a file that is stored compressed |
| 1698 # (double compressed case), there is no way we can validate the hash and | 1694 # (double compressed case), there is no way we can validate the hash and |
| 1699 # we will fail our hash check for the object. | 1695 # we will fail our hash check for the object. |
| 1700 if (src_obj_metadata.contentEncoding and | 1696 if (src_obj_metadata.contentEncoding and |
| 1701 src_obj_metadata.contentEncoding.lower().endswith('gzip')): | 1697 src_obj_metadata.contentEncoding.lower().endswith('gzip')): |
| 1702 # We can't use tempfile.mkstemp() here because we need a predictable | 1698 need_to_unzip = True |
| 1703 # filename for resumable downloads. | 1699 download_file_name = _GetDownloadTempZipFileName(dst_url) |
| 1704 download_file_name = _GetDownloadZipFileName(file_name) | |
| 1705 logger.info( | 1700 logger.info( |
| 1706 'Downloading to temp gzip filename %s', download_file_name) | 1701 'Downloading to temp gzip filename %s', download_file_name) |
| 1707 need_to_unzip = True | |
| 1708 else: | 1702 else: |
| 1709 download_file_name = file_name | 1703 download_file_name = _GetDownloadTempFileName(dst_url) |
| 1710 need_to_unzip = False | 1704 |
| 1711 | 1705 # If a file exists at the permanent destination (where the file will be moved |
| 1712 if download_file_name.endswith(dst_url.delim): | 1706 # after the download is completed), delete it here to reduce disk space |
| 1713 logger.warn('\n'.join(textwrap.wrap( | 1707 # requirements. |
| 1714 'Skipping attempt to download to filename ending with slash (%s). This ' | 1708 if os.path.exists(dst_url.object_name): |
| 1715 'typically happens when using gsutil to download from a subdirectory ' | 1709 os.unlink(dst_url.object_name) |
| 1716 'created by the Cloud Console (https://cloud.google.com/console)' | 1710 |
| 1717 % download_file_name))) | 1711 # Downloads open the temporary download file in r+b mode, which requires it |
| 1718 return (0, 0, dst_url, '') | 1712 # to already exist, so we create it here if it doesn't exist already. |
| 1719 | 1713 fp = open(download_file_name, 'ab') |
| 1720 # Set up hash digesters. | 1714 fp.close() |
| 1715 return download_file_name, need_to_unzip |
| 1716 |
| 1717 |
| 1718 def _ShouldDoSlicedDownload(download_strategy, src_obj_metadata, |
| 1719 allow_splitting, logger): |
| 1720 """Determines whether the sliced download strategy should be used. |
| 1721 |
| 1722 Args: |
| 1723 download_strategy: CloudApi download strategy. |
| 1724 src_obj_metadata: Metadata from the source object. |
| 1725 allow_splitting: If false, then this function returns false. |
| 1726 logger: logging.Logger for log message output. |
| 1727 |
| 1728 Returns: |
| 1729 True iff a sliced download should be performed on the source file. |
| 1730 """ |
| 1731 sliced_object_download_threshold = HumanReadableToBytes(config.get( |
| 1732 'GSUtil', 'sliced_object_download_threshold', |
| 1733 DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD)) |
| 1734 |
| 1735 max_components = config.getint( |
| 1736 'GSUtil', 'sliced_object_download_max_components', |
| 1737 DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS) |
| 1738 |
| 1739 # Don't use sliced download if it will prevent us from performing an |
| 1740 # integrity check. |
| 1741 check_hashes_config = config.get( |
| 1742 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) |
| 1743 parallel_hashing = src_obj_metadata.crc32c and UsingCrcmodExtension(crcmod) |
| 1744 hashing_okay = parallel_hashing or check_hashes_config == CHECK_HASH_NEVER |
| 1745 |
| 1746 use_slice = ( |
| 1747 allow_splitting |
| 1748 and download_strategy is not CloudApi.DownloadStrategy.ONE_SHOT |
| 1749 and max_components > 1 |
| 1750 and hashing_okay |
| 1751 and sliced_object_download_threshold > 0 |
| 1752 and src_obj_metadata.size >= sliced_object_download_threshold) |
| 1753 |
| 1754 if (not use_slice |
| 1755 and src_obj_metadata.size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD |
| 1756 and not UsingCrcmodExtension(crcmod) |
| 1757 and check_hashes_config != CHECK_HASH_NEVER): |
| 1758 with suggested_sliced_transfers_lock: |
| 1759 if not suggested_sliced_transfers.get('suggested'): |
| 1760 logger.info('\n'.join(textwrap.wrap( |
| 1761 '==> NOTE: You are downloading one or more large file(s), which ' |
| 1762 'would run significantly faster if you enabled sliced object ' |
| 1763 'uploads. This feature is enabled by default but requires that ' |
| 1764 'compiled crcmod be installed (see "gsutil help crcmod").')) + '\n') |
| 1765 suggested_sliced_transfers['suggested'] = True |
| 1766 |
| 1767 return use_slice |
| 1768 |
| 1769 |
| 1770 def _PerformSlicedDownloadObjectToFile(cls, args, thread_state=None): |
| 1771 """Function argument to Apply for performing sliced downloads. |
| 1772 |
| 1773 Args: |
| 1774 cls: Calling Command class. |
| 1775 args: PerformSlicedDownloadObjectToFileArgs tuple describing the target. |
| 1776 thread_state: gsutil Cloud API instance to use for the operation. |
| 1777 |
| 1778 Returns: |
| 1779 PerformSlicedDownloadReturnValues named-tuple filled with: |
| 1780 component_num: The component number for this download. |
| 1781 crc32c: CRC32C hash value (integer) of the downloaded bytes. |
| 1782 bytes_transferred: The number of bytes transferred, potentially less |
| 1783 than the component size if the download was resumed. |
| 1784 """ |
| 1785 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) |
| 1721 hash_algs = GetDownloadHashAlgs( | 1786 hash_algs = GetDownloadHashAlgs( |
| 1722 logger, src_has_md5=src_obj_metadata.md5Hash, | 1787 cls.logger, consider_crc32c=args.src_obj_metadata.crc32c) |
| 1723 src_has_crc32c=src_obj_metadata.crc32c) | |
| 1724 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) | 1788 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) |
| 1725 | 1789 |
| 1726 fp = None | 1790 (bytes_transferred, server_encoding) = ( |
| 1727 # Tracks whether the server used a gzip encoding. | 1791 _DownloadObjectToFileResumable(args.src_url, args.src_obj_metadata, |
| 1792 args.dst_url, args.download_file_name, |
| 1793 gsutil_api, cls.logger, digesters, |
| 1794 component_num=args.component_num, |
| 1795 start_byte=args.start_byte, |
| 1796 end_byte=args.end_byte)) |
| 1797 |
| 1798 crc32c_val = None |
| 1799 if 'crc32c' in digesters: |
| 1800 crc32c_val = digesters['crc32c'].crcValue |
| 1801 return PerformSlicedDownloadReturnValues( |
| 1802 args.component_num, crc32c_val, bytes_transferred, server_encoding) |
| 1803 |
| 1804 |
| 1805 def _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url, |
| 1806 download_file_name, logger, |
| 1807 api_selector, num_components): |
| 1808 """Maintains sliced download tracker files in order to permit resumability. |
| 1809 |
| 1810 Reads or creates a sliced download tracker file representing this object |
| 1811 download. Upon an attempt at cross-process resumption, the contents of the |
| 1812 sliced download tracker file are verified to make sure a resumption is |
| 1813 possible and appropriate. In the case that a resumption should not be |
| 1814 attempted, existing component tracker files are deleted (to prevent child |
| 1815 processes from attempting resumption), and a new sliced download tracker |
| 1816 file is created. |
| 1817 |
| 1818 Args: |
| 1819 src_obj_metadata: Metadata from the source object. Must include etag and |
| 1820 generation. |
| 1821 dst_url: Destination FileUrl. |
| 1822 download_file_name: Temporary file name to be used for the download. |
| 1823 logger: for outputting log messages. |
| 1824 api_selector: The Cloud API implementation used. |
| 1825 num_components: The number of components to perform this download with. |
| 1826 """ |
| 1827 assert src_obj_metadata.etag |
| 1828 tracker_file = None |
| 1829 |
| 1830 # Only can happen if the resumable threshold is set higher than the |
| 1831 # parallel transfer threshold. |
| 1832 if src_obj_metadata.size < ResumableThreshold(): |
| 1833 return |
| 1834 |
| 1835 tracker_file_name = GetTrackerFilePath(dst_url, |
| 1836 TrackerFileType.SLICED_DOWNLOAD, |
| 1837 api_selector) |
| 1838 |
| 1839 # Check to see if we should attempt resuming the download. |
| 1840 try: |
| 1841 fp = open(download_file_name, 'rb') |
| 1842 existing_file_size = GetFileSize(fp) |
| 1843 # A parallel resumption should be attempted only if the destination file |
| 1844 # size is exactly the same as the source size and the tracker file matches. |
| 1845 if existing_file_size == src_obj_metadata.size: |
| 1846 tracker_file = open(tracker_file_name, 'r') |
| 1847 tracker_file_data = json.load(tracker_file) |
| 1848 if (tracker_file_data['etag'] == src_obj_metadata.etag and |
| 1849 tracker_file_data['generation'] == src_obj_metadata.generation and |
| 1850 tracker_file_data['num_components'] == num_components): |
| 1851 return |
| 1852 else: |
| 1853 tracker_file.close() |
| 1854 logger.warn('Sliced download tracker file doesn\'t match for ' |
| 1855 'download of %s. Restarting download from scratch.' % |
| 1856 dst_url.object_name) |
| 1857 |
| 1858 except (IOError, ValueError) as e: |
| 1859 # Ignore non-existent file (happens first time a download |
| 1860 # is attempted on an object), but warn user for other errors. |
| 1861 if isinstance(e, ValueError) or e.errno != errno.ENOENT: |
| 1862 logger.warn('Couldn\'t read sliced download tracker file (%s): %s. ' |
| 1863 'Restarting download from scratch.' % |
| 1864 (tracker_file_name, str(e))) |
| 1865 finally: |
| 1866 if fp: |
| 1867 fp.close() |
| 1868 if tracker_file: |
| 1869 tracker_file.close() |
| 1870 |
| 1871 # Delete component tracker files to guarantee download starts from scratch. |
| 1872 DeleteDownloadTrackerFiles(dst_url, api_selector) |
| 1873 |
| 1874 # Create a new sliced download tracker file to represent this download. |
| 1875 try: |
| 1876 with open(tracker_file_name, 'w') as tracker_file: |
| 1877 tracker_file_data = {'etag': src_obj_metadata.etag, |
| 1878 'generation': src_obj_metadata.generation, |
| 1879 'num_components': num_components} |
| 1880 tracker_file.write(json.dumps(tracker_file_data)) |
| 1881 except IOError as e: |
| 1882 RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror) |
| 1883 |
| 1884 |
| 1885 class SlicedDownloadFileWrapper(object): |
| 1886 """Wraps a file object to be used in GetObjectMedia for sliced downloads. |
| 1887 |
| 1888 In order to allow resumability, the file object used by each thread in a |
| 1889 sliced object download should be wrapped using SlicedDownloadFileWrapper. |
| 1890 Passing a SlicedDownloadFileWrapper object to GetObjectMedia will allow the |
| 1891 download component tracker file for this component to be updated periodically, |
| 1892 while the downloaded bytes are normally written to file. |
| 1893 """ |
| 1894 |
| 1895 def __init__(self, fp, tracker_file_name, src_obj_metadata, start_byte, |
| 1896 end_byte): |
| 1897 """Initializes the SlicedDownloadFileWrapper. |
| 1898 |
| 1899 Args: |
| 1900 fp: The already-open file object to be used for writing in |
| 1901 GetObjectMedia. Data will be written to file starting at the current |
| 1902 seek position. |
| 1903 tracker_file_name: The name of the tracker file for this component. |
| 1904 src_obj_metadata: Metadata from the source object. Must include etag and |
| 1905 generation. |
| 1906 start_byte: The first byte to be downloaded for this parallel component. |
| 1907 end_byte: The last byte to be downloaded for this parallel component. |
| 1908 """ |
| 1909 self._orig_fp = fp |
| 1910 self._tracker_file_name = tracker_file_name |
| 1911 self._src_obj_metadata = src_obj_metadata |
| 1912 self._last_tracker_file_byte = None |
| 1913 self._start_byte = start_byte |
| 1914 self._end_byte = end_byte |
| 1915 |
| 1916 def write(self, data): # pylint: disable=invalid-name |
| 1917 current_file_pos = self._orig_fp.tell() |
| 1918 assert (self._start_byte <= current_file_pos and |
| 1919 current_file_pos + len(data) <= self._end_byte + 1) |
| 1920 |
| 1921 self._orig_fp.write(data) |
| 1922 current_file_pos = self._orig_fp.tell() |
| 1923 |
| 1924 threshold = TRACKERFILE_UPDATE_THRESHOLD |
| 1925 if (self._last_tracker_file_byte is None or |
| 1926 current_file_pos - self._last_tracker_file_byte > threshold or |
| 1927 current_file_pos == self._end_byte + 1): |
| 1928 WriteDownloadComponentTrackerFile( |
| 1929 self._tracker_file_name, self._src_obj_metadata, current_file_pos) |
| 1930 self._last_tracker_file_byte = current_file_pos |
| 1931 |
| 1932 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name |
| 1933 if whence == os.SEEK_END: |
| 1934 self._orig_fp.seek(offset + self._end_byte + 1) |
| 1935 else: |
| 1936 self._orig_fp.seek(offset, whence) |
| 1937 assert self._start_byte <= self._orig_fp.tell() <= self._end_byte + 1 |
| 1938 |
| 1939 def tell(self): # pylint: disable=invalid-name |
| 1940 return self._orig_fp.tell() |
| 1941 |
| 1942 def flush(self): # pylint: disable=invalid-name |
| 1943 self._orig_fp.flush() |
| 1944 |
| 1945 def close(self): # pylint: disable=invalid-name |
| 1946 if self._orig_fp: |
| 1947 self._orig_fp.close() |
| 1948 |
| 1949 |
| 1950 def _PartitionObject(src_url, src_obj_metadata, dst_url, |
| 1951 download_file_name): |
| 1952 """Partitions an object into components to be downloaded. |
| 1953 |
| 1954 Each component is a byte range of the object. The byte ranges |
| 1955 of the returned components are mutually exclusive and collectively |
| 1956 exhaustive. The byte ranges are inclusive at both end points. |
| 1957 |
| 1958 Args: |
| 1959 src_url: Source CloudUrl. |
| 1960 src_obj_metadata: Metadata from the source object. |
| 1961 dst_url: Destination FileUrl. |
| 1962 download_file_name: Temporary file name to be used for the download. |
| 1963 |
| 1964 Returns: |
| 1965 components_to_download: A list of PerformSlicedDownloadObjectToFileArgs |
| 1966 to be used in Apply for the sliced download. |
| 1967 """ |
| 1968 sliced_download_component_size = HumanReadableToBytes( |
| 1969 config.get('GSUtil', 'sliced_object_download_component_size', |
| 1970 DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE)) |
| 1971 |
| 1972 max_components = config.getint( |
| 1973 'GSUtil', 'sliced_object_download_max_components', |
| 1974 DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS) |
| 1975 |
| 1976 num_components, component_size = _GetPartitionInfo( |
| 1977 src_obj_metadata.size, max_components, sliced_download_component_size) |
| 1978 |
| 1979 components_to_download = [] |
| 1980 component_lengths = [] |
| 1981 for i in range(num_components): |
| 1982 start_byte = i * component_size |
| 1983 end_byte = min((i + 1) * (component_size) - 1, src_obj_metadata.size - 1) |
| 1984 component_lengths.append(end_byte - start_byte + 1) |
| 1985 components_to_download.append( |
| 1986 PerformSlicedDownloadObjectToFileArgs( |
| 1987 i, src_url, src_obj_metadata, dst_url, download_file_name, |
| 1988 start_byte, end_byte)) |
| 1989 return components_to_download, component_lengths |
| 1990 |
| 1991 |
| 1992 def _DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name, |
| 1993 command_obj, logger, copy_exception_handler, |
| 1994 api_selector): |
| 1995 """Downloads a cloud object to a local file using sliced download. |
| 1996 |
| 1997 Byte ranges are decided for each thread/process, and then the parts are |
| 1998 downloaded in parallel. |
| 1999 |
| 2000 Args: |
| 2001 src_url: Source CloudUrl. |
| 2002 src_obj_metadata: Metadata from the source object. |
| 2003 dst_url: Destination FileUrl. |
| 2004 download_file_name: Temporary file name to be used for download. |
| 2005 command_obj: command object for use in Apply in parallel composite uploads. |
| 2006 logger: for outputting log messages. |
| 2007 copy_exception_handler: For handling copy exceptions during Apply. |
| 2008 api_selector: The Cloud API implementation used. |
| 2009 |
| 2010 Returns: |
| 2011 (bytes_transferred, crc32c) |
| 2012 bytes_transferred: Number of bytes transferred from server this call. |
| 2013 crc32c: a crc32c hash value (integer) for the downloaded bytes, or None if |
| 2014 crc32c hashing wasn't performed. |
| 2015 """ |
| 2016 components_to_download, component_lengths = _PartitionObject( |
| 2017 src_url, src_obj_metadata, dst_url, download_file_name) |
| 2018 |
| 2019 num_components = len(components_to_download) |
| 2020 _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url, |
| 2021 download_file_name, logger, |
| 2022 api_selector, num_components) |
| 2023 |
| 2024 # Resize the download file so each child process can seek to its start byte. |
| 2025 with open(download_file_name, 'ab') as fp: |
| 2026 fp.truncate(src_obj_metadata.size) |
| 2027 |
| 2028 cp_results = command_obj.Apply( |
| 2029 _PerformSlicedDownloadObjectToFile, components_to_download, |
| 2030 copy_exception_handler, arg_checker=gslib.command.DummyArgChecker, |
| 2031 parallel_operations_override=True, should_return_results=True) |
| 2032 |
| 2033 if len(cp_results) < num_components: |
| 2034 raise CommandException( |
| 2035 'Some components of %s were not downloaded successfully. ' |
| 2036 'Please retry this download.' % dst_url.object_name) |
| 2037 |
| 2038 # Crc32c hashes have to be concatenated in the correct order. |
| 2039 cp_results = sorted(cp_results, key=attrgetter('component_num')) |
| 2040 crc32c = cp_results[0].crc32c |
| 2041 if crc32c is not None: |
| 2042 for i in range(1, num_components): |
| 2043 crc32c = ConcatCrc32c(crc32c, cp_results[i].crc32c, |
| 2044 component_lengths[i]) |
| 2045 |
| 2046 bytes_transferred = 0 |
| 2047 expect_gzip = (src_obj_metadata.contentEncoding and |
| 2048 src_obj_metadata.contentEncoding.lower().endswith('gzip')) |
| 2049 for cp_result in cp_results: |
| 2050 bytes_transferred += cp_result.bytes_transferred |
| 2051 server_gzip = (cp_result.server_encoding and |
| 2052 cp_result.server_encoding.lower().endswith('gzip')) |
| 2053 # If the server gzipped any components on the fly, we will have no chance of |
| 2054 # properly reconstructing the file. |
| 2055 if server_gzip and not expect_gzip: |
| 2056 raise CommandException( |
| 2057 'Download of %s failed because the server sent back data with an ' |
| 2058 'unexpected encoding.' % dst_url.object_name) |
| 2059 |
| 2060 return bytes_transferred, crc32c |
| 2061 |
| 2062 |
| 2063 def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url, |
| 2064 download_file_name, gsutil_api, logger, |
| 2065 digesters, component_num=None, start_byte=0, |
| 2066 end_byte=None): |
| 2067 """Downloads an object to a local file using the resumable strategy. |
| 2068 |
| 2069 Args: |
| 2070 src_url: Source CloudUrl. |
| 2071 src_obj_metadata: Metadata from the source object. |
| 2072 dst_url: Destination FileUrl. |
| 2073 download_file_name: Temporary file name to be used for download. |
| 2074 gsutil_api: gsutil Cloud API instance to use for the download. |
| 2075 logger: for outputting log messages. |
| 2076 digesters: Digesters corresponding to the hash algorithms that will be used |
| 2077 for validation. |
| 2078 component_num: Which component of a sliced download this call is for, or |
| 2079 None if this is not a sliced download. |
| 2080 start_byte: The first byte of a byte range for a sliced download. |
| 2081 end_byte: The last byte of a byte range for a sliced download. |
| 2082 |
| 2083 Returns: |
| 2084 (bytes_transferred, server_encoding) |
| 2085 bytes_transferred: Number of bytes transferred from server this call. |
| 2086 server_encoding: Content-encoding string if it was detected that the server |
| 2087 sent encoded bytes during transfer, None otherwise. |
| 2088 """ |
| 2089 if end_byte is None: |
| 2090 end_byte = src_obj_metadata.size - 1 |
| 2091 download_size = end_byte - start_byte + 1 |
| 2092 |
| 2093 is_sliced = component_num is not None |
| 2094 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) |
| 1728 server_encoding = None | 2095 server_encoding = None |
| 1729 download_complete = False | 2096 |
| 1730 download_strategy = _SelectDownloadStrategy(dst_url) | 2097 # Used for logging |
| 1731 download_start_point = 0 | 2098 download_name = dst_url.object_name |
| 1732 # This is used for resuming downloads, but also for passing the mediaLink | 2099 if is_sliced: |
| 1733 # and size into the download for new downloads so that we can avoid | 2100 download_name += ' component %d' % component_num |
| 1734 # making an extra HTTP call. | 2101 |
| 1735 serialization_data = None | |
| 1736 serialization_dict = GetDownloadSerializationDict(src_obj_metadata) | |
| 1737 open_files = [] | |
| 1738 try: | 2102 try: |
| 1739 if download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: | 2103 fp = open(download_file_name, 'r+b') |
| 1740 fp = open(download_file_name, 'wb') | 2104 fp.seek(start_byte) |
| 1741 elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE: | 2105 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) |
| 1742 # If this is a resumable download, we need to open the file for append and | 2106 existing_file_size = GetFileSize(fp) |
| 1743 # manage a tracker file. | 2107 |
| 1744 if open_files_map.get(download_file_name, False): | 2108 tracker_file_name, download_start_byte = ( |
| 1745 # Ensure another process/thread is not already writing to this file. | 2109 ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger, |
| 1746 raise FileConcurrencySkipError | 2110 api_selector, start_byte, |
| 1747 open_files.append(download_file_name) | 2111 existing_file_size, component_num)) |
| 1748 open_files_map[download_file_name] = True | 2112 |
| 1749 fp = open(download_file_name, 'ab') | 2113 if download_start_byte < start_byte or download_start_byte > end_byte + 1: |
| 1750 | 2114 DeleteTrackerFile(tracker_file_name) |
| 1751 resuming = ReadOrCreateDownloadTrackerFile( | 2115 raise CommandException( |
| 1752 src_obj_metadata, dst_url, api_selector) | 2116 'Resumable download start point for %s is not in the correct byte ' |
| 1753 if resuming: | 2117 'range. Deleting tracker file, so if you re-try this download it ' |
| 1754 # Find out how far along we are so we can request the appropriate | 2118 'will start from scratch' % download_name) |
| 1755 # remaining range of the object. | 2119 |
| 1756 existing_file_size = GetFileSize(fp, position_to_eof=True) | 2120 download_complete = (download_start_byte == start_byte + download_size) |
| 1757 if existing_file_size > src_obj_metadata.size: | 2121 resuming = (download_start_byte != start_byte) and not download_complete |
| 1758 DeleteTrackerFile(GetTrackerFilePath( | 2122 if resuming: |
| 1759 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | 2123 logger.info('Resuming download for %s', download_name) |
| 1760 raise CommandException( | 2124 elif download_complete: |
| 1761 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' | 2125 logger.info( |
| 1762 'if you re-try this download it will start from scratch' % | 2126 'Download already complete for %s, skipping download but ' |
| 1763 (download_file_name, existing_file_size, src_url.object_name, | 2127 'will run integrity checks.', download_name) |
| 1764 src_obj_metadata.size)) | 2128 |
| 1765 else: | 2129 # This is used for resuming downloads, but also for passing the mediaLink |
| 1766 if existing_file_size == src_obj_metadata.size: | 2130 # and size into the download for new downloads so that we can avoid |
| 1767 logger.info( | 2131 # making an extra HTTP call. |
| 1768 'Download already complete for file %s, skipping download but ' | 2132 serialization_data = GetDownloadSerializationData( |
| 1769 'will run integrity checks.', download_file_name) | 2133 src_obj_metadata, progress=download_start_byte) |
| 1770 download_complete = True | 2134 |
| 1771 else: | 2135 if resuming or download_complete: |
| 1772 download_start_point = existing_file_size | 2136 # Catch up our digester with the hash data. |
| 1773 serialization_dict['progress'] = download_start_point | 2137 bytes_digested = 0 |
| 1774 logger.info('Resuming download for %s', src_url.url_string) | 2138 total_bytes_to_digest = download_start_byte - start_byte |
| 1775 # Catch up our digester with the hash data. | 2139 hash_callback = ProgressCallbackWithBackoff( |
| 1776 if existing_file_size > TEN_MIB: | 2140 total_bytes_to_digest, |
| 1777 for alg_name in digesters: | 2141 FileProgressCallbackHandler( |
| 1778 logger.info( | 2142 ConstructAnnounceText('Hashing', |
| 1779 'Catching up %s for %s', alg_name, download_file_name) | 2143 dst_url.url_string), logger).call) |
| 1780 with open(download_file_name, 'rb') as hash_fp: | 2144 |
| 1781 while True: | 2145 while bytes_digested < total_bytes_to_digest: |
| 1782 data = hash_fp.read(DEFAULT_FILE_BUFFER_SIZE) | 2146 bytes_to_read = min(DEFAULT_FILE_BUFFER_SIZE, |
| 1783 if not data: | 2147 total_bytes_to_digest - bytes_digested) |
| 1784 break | 2148 data = fp.read(bytes_to_read) |
| 1785 for alg_name in digesters: | 2149 bytes_digested += bytes_to_read |
| 1786 digesters[alg_name].update(data) | 2150 for alg_name in digesters: |
| 1787 else: | 2151 digesters[alg_name].update(data) |
| 1788 # Starting a new download, blow away whatever is already there. | 2152 hash_callback.Progress(len(data)) |
| 1789 fp.truncate(0) | 2153 |
| 1790 fp.seek(0) | 2154 elif not is_sliced: |
| 1791 | 2155 # Delete file contents and start entire object download from scratch. |
| 1792 else: | 2156 fp.truncate(0) |
| 1793 raise CommandException('Invalid download strategy %s chosen for' | 2157 existing_file_size = 0 |
| 1794 'file %s' % (download_strategy, fp.name)) | |
| 1795 | |
| 1796 if not dst_url.IsStream(): | |
| 1797 serialization_data = json.dumps(serialization_dict) | |
| 1798 | 2158 |
| 1799 progress_callback = FileProgressCallbackHandler( | 2159 progress_callback = FileProgressCallbackHandler( |
| 1800 ConstructAnnounceText('Downloading', dst_url.url_string), | 2160 ConstructAnnounceText('Downloading', dst_url.url_string), logger, |
| 1801 logger).call | 2161 start_byte, download_size).call |
| 2162 |
| 1802 if global_copy_helper_opts.test_callback_file: | 2163 if global_copy_helper_opts.test_callback_file: |
| 1803 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: | 2164 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: |
| 1804 progress_callback = pickle.loads(test_fp.read()).call | 2165 progress_callback = pickle.loads(test_fp.read()).call |
| 1805 | 2166 |
| 1806 start_time = time.time() | 2167 if is_sliced and src_obj_metadata.size >= ResumableThreshold(): |
| 2168 fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata, |
| 2169 start_byte, end_byte) |
| 2170 |
| 1807 # TODO: With gzip encoding (which may occur on-the-fly and not be part of | 2171 # TODO: With gzip encoding (which may occur on-the-fly and not be part of |
| 1808 # the object's metadata), when we request a range to resume, it's possible | 2172 # the object's metadata), when we request a range to resume, it's possible |
| 1809 # that the server will just resend the entire object, which means our | 2173 # that the server will just resend the entire object, which means our |
| 1810 # caught-up hash will be incorrect. We recalculate the hash on | 2174 # caught-up hash will be incorrect. We recalculate the hash on |
| 1811 # the local file in the case of a failed gzip hash anyway, but it would | 2175 # the local file in the case of a failed gzip hash anyway, but it would |
| 1812 # be better if we actively detected this case. | 2176 # be better if we actively detected this case. |
| 1813 if not download_complete: | 2177 if not download_complete: |
| 2178 fp.seek(download_start_byte) |
| 1814 server_encoding = gsutil_api.GetObjectMedia( | 2179 server_encoding = gsutil_api.GetObjectMedia( |
| 1815 src_url.bucket_name, src_url.object_name, fp, | 2180 src_url.bucket_name, src_url.object_name, fp, |
| 1816 start_byte=download_start_point, generation=src_url.generation, | 2181 start_byte=download_start_byte, end_byte=end_byte, |
| 1817 object_size=src_obj_metadata.size, | 2182 generation=src_url.generation, object_size=src_obj_metadata.size, |
| 1818 download_strategy=download_strategy, provider=src_url.scheme, | 2183 download_strategy=CloudApi.DownloadStrategy.RESUMABLE, |
| 1819 serialization_data=serialization_data, digesters=digesters, | 2184 provider=src_url.scheme, serialization_data=serialization_data, |
| 1820 progress_callback=progress_callback) | 2185 digesters=digesters, progress_callback=progress_callback) |
| 1821 | |
| 1822 end_time = time.time() | |
| 1823 | |
| 1824 # If a custom test method is defined, call it here. For the copy command, | |
| 1825 # test methods are expected to take one argument: an open file pointer, | |
| 1826 # and are used to perturb the open file during download to exercise | |
| 1827 # download error detection. | |
| 1828 if test_method: | |
| 1829 test_method(fp) | |
| 1830 | 2186 |
| 1831 except ResumableDownloadException as e: | 2187 except ResumableDownloadException as e: |
| 1832 logger.warning('Caught ResumableDownloadException (%s) for file %s.', | 2188 logger.warning('Caught ResumableDownloadException (%s) for download of %s.', |
| 1833 e.reason, file_name) | 2189 e.reason, download_name) |
| 1834 raise | 2190 raise |
| 1835 | |
| 1836 finally: | 2191 finally: |
| 1837 if fp: | 2192 if fp: |
| 1838 fp.close() | 2193 fp.close() |
| 1839 for file_name in open_files: | |
| 1840 open_files_map.delete(file_name) | |
| 1841 | 2194 |
| 1842 # If we decompressed a content-encoding gzip file on the fly, this may not | 2195 bytes_transferred = end_byte - download_start_byte + 1 |
| 1843 # be accurate, but it is the best we can do without going deep into the | 2196 return bytes_transferred, server_encoding |
| 1844 # underlying HTTP libraries. Note that this value is only used for | 2197 |
| 1845 # reporting in log messages; inaccuracy doesn't impact the integrity of the | 2198 |
| 1846 # download. | 2199 def _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url, |
| 1847 bytes_transferred = src_obj_metadata.size - download_start_point | 2200 download_file_name, gsutil_api, logger, |
| 2201 digesters): |
| 2202 """Downloads an object to a local file using the non-resumable strategy. |
| 2203 |
| 2204 Args: |
| 2205 src_url: Source CloudUrl. |
| 2206 src_obj_metadata: Metadata from the source object. |
| 2207 dst_url: Destination FileUrl. |
| 2208 download_file_name: Temporary file name to be used for download. |
| 2209 gsutil_api: gsutil Cloud API instance to use for the download. |
| 2210 logger: for outputting log messages. |
| 2211 digesters: Digesters corresponding to the hash algorithms that will be used |
| 2212 for validation. |
| 2213 Returns: |
| 2214 (bytes_transferred, server_encoding) |
| 2215 bytes_transferred: Number of bytes transferred from server this call. |
| 2216 server_encoding: Content-encoding string if it was detected that the server |
| 2217 sent encoded bytes during transfer, None otherwise. |
| 2218 """ |
| 2219 try: |
| 2220 fp = open(download_file_name, 'w') |
| 2221 |
| 2222 # This is used to pass the mediaLink and the size into the download so that |
| 2223 # we can avoid making an extra HTTP call. |
| 2224 serialization_data = GetDownloadSerializationData(src_obj_metadata) |
| 2225 |
| 2226 progress_callback = FileProgressCallbackHandler( |
| 2227 ConstructAnnounceText('Downloading', dst_url.url_string), logger).call |
| 2228 |
| 2229 if global_copy_helper_opts.test_callback_file: |
| 2230 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: |
| 2231 progress_callback = pickle.loads(test_fp.read()).call |
| 2232 |
| 2233 server_encoding = gsutil_api.GetObjectMedia( |
| 2234 src_url.bucket_name, src_url.object_name, fp, |
| 2235 generation=src_url.generation, object_size=src_obj_metadata.size, |
| 2236 download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, |
| 2237 provider=src_url.scheme, serialization_data=serialization_data, |
| 2238 digesters=digesters, progress_callback=progress_callback) |
| 2239 finally: |
| 2240 if fp: |
| 2241 fp.close() |
| 2242 |
| 2243 return src_obj_metadata.size, server_encoding |
| 2244 |
| 2245 |
| 2246 def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
| 2247 gsutil_api, logger, command_obj, |
| 2248 copy_exception_handler, allow_splitting=True): |
| 2249 """Downloads an object to a local file. |
| 2250 |
| 2251 Args: |
| 2252 src_url: Source CloudUrl. |
| 2253 src_obj_metadata: Metadata from the source object. |
| 2254 dst_url: Destination FileUrl. |
| 2255 gsutil_api: gsutil Cloud API instance to use for the download. |
| 2256 logger: for outputting log messages. |
| 2257 command_obj: command object for use in Apply in sliced downloads. |
| 2258 copy_exception_handler: For handling copy exceptions during Apply. |
| 2259 allow_splitting: Whether or not to allow sliced download. |
| 2260 Returns: |
| 2261 (elapsed_time, bytes_transferred, dst_url, md5), where time elapsed |
| 2262 excludes initial GET. |
| 2263 |
| 2264 Raises: |
| 2265 FileConcurrencySkipError: if this download is already in progress. |
| 2266 CommandException: if other errors encountered. |
| 2267 """ |
| 2268 global open_files_map, open_files_lock |
| 2269 if dst_url.object_name.endswith(dst_url.delim): |
| 2270 logger.warn('\n'.join(textwrap.wrap( |
| 2271 'Skipping attempt to download to filename ending with slash (%s). This ' |
| 2272 'typically happens when using gsutil to download from a subdirectory ' |
| 2273 'created by the Cloud Console (https://cloud.google.com/console)' |
| 2274 % dst_url.object_name))) |
| 2275 return (0, 0, dst_url, '') |
| 2276 |
| 2277 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) |
| 2278 download_strategy = _SelectDownloadStrategy(dst_url) |
| 2279 sliced_download = _ShouldDoSlicedDownload( |
| 2280 download_strategy, src_obj_metadata, allow_splitting, logger) |
| 2281 |
| 2282 download_file_name, need_to_unzip = _GetDownloadFile( |
| 2283 dst_url, src_obj_metadata, logger) |
| 2284 |
| 2285 # Ensure another process/thread is not already writing to this file. |
| 2286 with open_files_lock: |
| 2287 if open_files_map.get(download_file_name, False): |
| 2288 raise FileConcurrencySkipError |
| 2289 open_files_map[download_file_name] = True |
| 2290 |
| 2291 # Set up hash digesters. |
| 2292 consider_md5 = src_obj_metadata.md5Hash and not sliced_download |
| 2293 hash_algs = GetDownloadHashAlgs(logger, consider_md5=consider_md5, |
| 2294 consider_crc32c=src_obj_metadata.crc32c) |
| 2295 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) |
| 2296 |
| 2297 # Tracks whether the server used a gzip encoding. |
| 2298 server_encoding = None |
| 2299 download_complete = (src_obj_metadata.size == 0) |
| 2300 bytes_transferred = 0 |
| 2301 |
| 2302 start_time = time.time() |
| 2303 if not download_complete: |
| 2304 if sliced_download: |
| 2305 (bytes_transferred, crc32c) = ( |
| 2306 _DoSlicedDownload(src_url, src_obj_metadata, dst_url, |
| 2307 download_file_name, command_obj, logger, |
| 2308 copy_exception_handler, api_selector)) |
| 2309 if 'crc32c' in digesters: |
| 2310 digesters['crc32c'].crcValue = crc32c |
| 2311 elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: |
| 2312 (bytes_transferred, server_encoding) = ( |
| 2313 _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url, |
| 2314 download_file_name, gsutil_api, |
| 2315 logger, digesters)) |
| 2316 elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE: |
| 2317 (bytes_transferred, server_encoding) = ( |
| 2318 _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url, |
| 2319 download_file_name, gsutil_api, logger, |
| 2320 digesters)) |
| 2321 else: |
| 2322 raise CommandException('Invalid download strategy %s chosen for' |
| 2323 'file %s' % (download_strategy, |
| 2324 download_file_name)) |
| 2325 end_time = time.time() |
| 2326 |
| 1848 server_gzip = server_encoding and server_encoding.lower().endswith('gzip') | 2327 server_gzip = server_encoding and server_encoding.lower().endswith('gzip') |
| 1849 local_md5 = _ValidateDownloadHashes(logger, src_url, src_obj_metadata, | 2328 local_md5 = _ValidateAndCompleteDownload( |
| 1850 dst_url, need_to_unzip, server_gzip, | 2329 logger, src_url, src_obj_metadata, dst_url, need_to_unzip, server_gzip, |
| 1851 digesters, hash_algs, api_selector, | 2330 digesters, hash_algs, download_file_name, api_selector, bytes_transferred) |
| 1852 bytes_transferred) | 2331 |
| 2332 with open_files_lock: |
| 2333 open_files_map.delete(download_file_name) |
| 1853 | 2334 |
| 1854 return (end_time - start_time, bytes_transferred, dst_url, local_md5) | 2335 return (end_time - start_time, bytes_transferred, dst_url, local_md5) |
| 1855 | 2336 |
| 1856 | 2337 |
| 1857 def _GetDownloadZipFileName(file_name): | 2338 def _GetDownloadTempZipFileName(dst_url): |
| 1858 """Returns the file name for a temporarily compressed downloaded file.""" | 2339 """Returns temporary file name for a temporarily compressed download.""" |
| 1859 return '%s_.gztmp' % file_name | 2340 return '%s_.gztmp' % dst_url.object_name |
| 1860 | 2341 |
| 1861 | 2342 |
| 1862 def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, | 2343 def _GetDownloadTempFileName(dst_url): |
| 1863 need_to_unzip, server_gzip, digesters, hash_algs, | 2344 """Returns temporary download file name for uncompressed downloads.""" |
| 1864 api_selector, bytes_transferred): | 2345 return '%s_.gstmp' % dst_url.object_name |
| 1865 """Validates a downloaded file's integrity. | 2346 |
| 2347 |
| 2348 def _ValidateAndCompleteDownload(logger, src_url, src_obj_metadata, dst_url, |
| 2349 need_to_unzip, server_gzip, digesters, |
| 2350 hash_algs, download_file_name, |
| 2351 api_selector, bytes_transferred): |
| 2352 """Validates and performs necessary operations on a downloaded file. |
| 2353 |
| 2354 Validates the integrity of the downloaded file using hash_algs. If the file |
| 2355 was compressed (temporarily), the file will be decompressed. Then, if the |
| 2356 integrity of the file was successfully validated, the file will be moved |
| 2357 from its temporary download location to its permanent location on disk. |
| 1866 | 2358 |
| 1867 Args: | 2359 Args: |
| 1868 logger: For outputting log messages. | 2360 logger: For outputting log messages. |
| 1869 src_url: StorageUrl for the source object. | 2361 src_url: StorageUrl for the source object. |
| 1870 src_obj_metadata: Metadata for the source object, potentially containing | 2362 src_obj_metadata: Metadata for the source object, potentially containing |
| 1871 hash values. | 2363 hash values. |
| 1872 dst_url: StorageUrl describing the destination file. | 2364 dst_url: StorageUrl describing the destination file. |
| 1873 need_to_unzip: If true, a temporary zip file was used and must be | 2365 need_to_unzip: If true, a temporary zip file was used and must be |
| 1874 uncompressed as part of validation. | 2366 uncompressed as part of validation. |
| 1875 server_gzip: If true, the server gzipped the bytes (regardless of whether | 2367 server_gzip: If true, the server gzipped the bytes (regardless of whether |
| 1876 the object metadata claimed it was gzipped). | 2368 the object metadata claimed it was gzipped). |
| 1877 digesters: dict of {string, hash digester} that contains up-to-date digests | 2369 digesters: dict of {string, hash digester} that contains up-to-date digests |
| 1878 computed during the download. If a digester for a particular | 2370 computed during the download. If a digester for a particular |
| 1879 algorithm is None, an up-to-date digest is not available and the | 2371 algorithm is None, an up-to-date digest is not available and the |
| 1880 hash must be recomputed from the local file. | 2372 hash must be recomputed from the local file. |
| 1881 hash_algs: dict of {string, hash algorithm} that can be used if digesters | 2373 hash_algs: dict of {string, hash algorithm} that can be used if digesters |
| 1882 don't have up-to-date digests. | 2374 don't have up-to-date digests. |
| 2375 download_file_name: Temporary file name that was used for download. |
| 1883 api_selector: The Cloud API implementation used (used tracker file naming). | 2376 api_selector: The Cloud API implementation used (used tracker file naming). |
| 1884 bytes_transferred: Number of bytes downloaded (used for logging). | 2377 bytes_transferred: Number of bytes downloaded (used for logging). |
| 1885 | 2378 |
| 1886 Returns: | 2379 Returns: |
| 1887 An MD5 of the local file, if one was calculated as part of the integrity | 2380 An MD5 of the local file, if one was calculated as part of the integrity |
| 1888 check. | 2381 check. |
| 1889 """ | 2382 """ |
| 1890 file_name = dst_url.object_name | 2383 final_file_name = dst_url.object_name |
| 1891 download_file_name = (_GetDownloadZipFileName(file_name) if need_to_unzip else | 2384 file_name = download_file_name |
| 1892 file_name) | |
| 1893 digesters_succeeded = True | 2385 digesters_succeeded = True |
| 2386 |
| 1894 for alg in digesters: | 2387 for alg in digesters: |
| 1895 # If we get a digester with a None algorithm, the underlying | 2388 # If we get a digester with a None algorithm, the underlying |
| 1896 # implementation failed to calculate a digest, so we will need to | 2389 # implementation failed to calculate a digest, so we will need to |
| 1897 # calculate one from scratch. | 2390 # calculate one from scratch. |
| 1898 if not digesters[alg]: | 2391 if not digesters[alg]: |
| 1899 digesters_succeeded = False | 2392 digesters_succeeded = False |
| 1900 break | 2393 break |
| 1901 | 2394 |
| 1902 if digesters_succeeded: | 2395 if digesters_succeeded: |
| 1903 local_hashes = _CreateDigestsFromDigesters(digesters) | 2396 local_hashes = _CreateDigestsFromDigesters(digesters) |
| 1904 else: | 2397 else: |
| 1905 local_hashes = _CreateDigestsFromLocalFile( | 2398 local_hashes = _CreateDigestsFromLocalFile( |
| 1906 logger, hash_algs, download_file_name, src_obj_metadata) | 2399 logger, hash_algs, file_name, final_file_name, src_obj_metadata) |
| 1907 | 2400 |
| 1908 digest_verified = True | 2401 digest_verified = True |
| 1909 hash_invalid_exception = None | 2402 hash_invalid_exception = None |
| 1910 try: | 2403 try: |
| 1911 _CheckHashes(logger, src_url, src_obj_metadata, download_file_name, | 2404 _CheckHashes(logger, src_url, src_obj_metadata, final_file_name, |
| 1912 local_hashes) | 2405 local_hashes) |
| 1913 DeleteTrackerFile(GetTrackerFilePath( | 2406 DeleteDownloadTrackerFiles(dst_url, api_selector) |
| 1914 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | |
| 1915 except HashMismatchException, e: | 2407 except HashMismatchException, e: |
| 1916 # If an non-gzipped object gets sent with gzip content encoding, the hash | 2408 # If an non-gzipped object gets sent with gzip content encoding, the hash |
| 1917 # we calculate will match the gzipped bytes, not the original object. Thus, | 2409 # we calculate will match the gzipped bytes, not the original object. Thus, |
| 1918 # we'll need to calculate and check it after unzipping. | 2410 # we'll need to calculate and check it after unzipping. |
| 1919 if server_gzip: | 2411 if server_gzip: |
| 1920 logger.debug( | 2412 logger.debug( |
| 1921 'Hash did not match but server gzipped the content, will ' | 2413 'Hash did not match but server gzipped the content, will ' |
| 1922 'recalculate.') | 2414 'recalculate.') |
| 1923 digest_verified = False | 2415 digest_verified = False |
| 1924 elif api_selector == ApiSelector.XML: | 2416 elif api_selector == ApiSelector.XML: |
| 1925 logger.debug( | 2417 logger.debug( |
| 1926 'Hash did not match but server may have gzipped the content, will ' | 2418 'Hash did not match but server may have gzipped the content, will ' |
| 1927 'recalculate.') | 2419 'recalculate.') |
| 1928 # Save off the exception in case this isn't a gzipped file. | 2420 # Save off the exception in case this isn't a gzipped file. |
| 1929 hash_invalid_exception = e | 2421 hash_invalid_exception = e |
| 1930 digest_verified = False | 2422 digest_verified = False |
| 1931 else: | 2423 else: |
| 1932 DeleteTrackerFile(GetTrackerFilePath( | 2424 DeleteDownloadTrackerFiles(dst_url, api_selector) |
| 1933 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | |
| 1934 if _RENAME_ON_HASH_MISMATCH: | 2425 if _RENAME_ON_HASH_MISMATCH: |
| 1935 os.rename(download_file_name, | 2426 os.rename(file_name, |
| 1936 download_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) | 2427 final_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) |
| 1937 else: | 2428 else: |
| 1938 os.unlink(download_file_name) | 2429 os.unlink(file_name) |
| 1939 raise | 2430 raise |
| 1940 | 2431 |
| 1941 if server_gzip and not need_to_unzip: | |
| 1942 # Server compressed bytes on-the-fly, thus we need to rename and decompress. | |
| 1943 # We can't decompress on-the-fly because prior to Python 3.2 the gzip | |
| 1944 # module makes a bunch of seek calls on the stream. | |
| 1945 download_file_name = _GetDownloadZipFileName(file_name) | |
| 1946 os.rename(file_name, download_file_name) | |
| 1947 | |
| 1948 if need_to_unzip or server_gzip: | 2432 if need_to_unzip or server_gzip: |
| 1949 # Log that we're uncompressing if the file is big enough that | 2433 # Log that we're uncompressing if the file is big enough that |
| 1950 # decompressing would make it look like the transfer "stalled" at the end. | 2434 # decompressing would make it look like the transfer "stalled" at the end. |
| 1951 if bytes_transferred > TEN_MIB: | 2435 if bytes_transferred > TEN_MIB: |
| 1952 logger.info( | 2436 logger.info( |
| 1953 'Uncompressing downloaded tmp file to %s...', file_name) | 2437 'Uncompressing temporarily gzipped file to %s...', final_file_name) |
| 1954 | 2438 |
| 1955 # Downloaded gzipped file to a filename w/o .gz extension, so unzip. | |
| 1956 gzip_fp = None | 2439 gzip_fp = None |
| 1957 try: | 2440 try: |
| 1958 gzip_fp = gzip.open(download_file_name, 'rb') | 2441 # Downloaded temporarily gzipped file, unzip to file without '_.gztmp' |
| 1959 with open(file_name, 'wb') as f_out: | 2442 # suffix. |
| 2443 gzip_fp = gzip.open(file_name, 'rb') |
| 2444 with open(final_file_name, 'wb') as f_out: |
| 1960 data = gzip_fp.read(GZIP_CHUNK_SIZE) | 2445 data = gzip_fp.read(GZIP_CHUNK_SIZE) |
| 1961 while data: | 2446 while data: |
| 1962 f_out.write(data) | 2447 f_out.write(data) |
| 1963 data = gzip_fp.read(GZIP_CHUNK_SIZE) | 2448 data = gzip_fp.read(GZIP_CHUNK_SIZE) |
| 1964 except IOError, e: | 2449 except IOError, e: |
| 1965 # In the XML case where we don't know if the file was gzipped, raise | 2450 # In the XML case where we don't know if the file was gzipped, raise |
| 1966 # the original hash exception if we find that it wasn't. | 2451 # the original hash exception if we find that it wasn't. |
| 1967 if 'Not a gzipped file' in str(e) and hash_invalid_exception: | 2452 if 'Not a gzipped file' in str(e) and hash_invalid_exception: |
| 1968 # Linter improperly thinks we're raising None despite the above check. | 2453 # Linter improperly thinks we're raising None despite the above check. |
| 1969 # pylint: disable=raising-bad-type | 2454 # pylint: disable=raising-bad-type |
| 1970 raise hash_invalid_exception | 2455 raise hash_invalid_exception |
| 1971 finally: | 2456 finally: |
| 1972 if gzip_fp: | 2457 if gzip_fp: |
| 1973 gzip_fp.close() | 2458 gzip_fp.close() |
| 1974 | 2459 |
| 1975 os.unlink(download_file_name) | 2460 os.unlink(file_name) |
| 2461 file_name = final_file_name |
| 1976 | 2462 |
| 1977 if not digest_verified: | 2463 if not digest_verified: |
| 1978 try: | 2464 try: |
| 1979 # Recalculate hashes on the unzipped local file. | 2465 # Recalculate hashes on the unzipped local file. |
| 1980 local_hashes = _CreateDigestsFromLocalFile(logger, hash_algs, file_name, | 2466 local_hashes = _CreateDigestsFromLocalFile( |
| 1981 src_obj_metadata) | 2467 logger, hash_algs, file_name, final_file_name, src_obj_metadata) |
| 1982 _CheckHashes(logger, src_url, src_obj_metadata, file_name, local_hashes) | 2468 _CheckHashes(logger, src_url, src_obj_metadata, final_file_name, |
| 1983 DeleteTrackerFile(GetTrackerFilePath( | 2469 local_hashes) |
| 1984 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | 2470 DeleteDownloadTrackerFiles(dst_url, api_selector) |
| 1985 except HashMismatchException: | 2471 except HashMismatchException: |
| 1986 DeleteTrackerFile(GetTrackerFilePath( | 2472 DeleteDownloadTrackerFiles(dst_url, api_selector) |
| 1987 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | |
| 1988 if _RENAME_ON_HASH_MISMATCH: | 2473 if _RENAME_ON_HASH_MISMATCH: |
| 1989 os.rename(file_name, | 2474 os.rename(file_name, |
| 1990 file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) | 2475 file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) |
| 1991 else: | 2476 else: |
| 1992 os.unlink(file_name) | 2477 os.unlink(file_name) |
| 1993 raise | 2478 raise |
| 1994 | 2479 |
| 2480 if file_name != final_file_name: |
| 2481 # Data is still in a temporary file, so move it to a permanent location. |
| 2482 if os.path.exists(final_file_name): |
| 2483 os.unlink(final_file_name) |
| 2484 os.rename(file_name, |
| 2485 final_file_name) |
| 2486 |
| 1995 if 'md5' in local_hashes: | 2487 if 'md5' in local_hashes: |
| 1996 return local_hashes['md5'] | 2488 return local_hashes['md5'] |
| 1997 | 2489 |
| 1998 | 2490 |
| 1999 def _CopyFileToFile(src_url, dst_url): | 2491 def _CopyFileToFile(src_url, dst_url): |
| 2000 """Copies a local file to a local file. | 2492 """Copies a local file to a local file. |
| 2001 | 2493 |
| 2002 Args: | 2494 Args: |
| 2003 src_url: Source FileUrl. | 2495 src_url: Source FileUrl. |
| 2004 dst_url: Destination FileUrl. | 2496 dst_url: Destination FileUrl. |
| (...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 2116 result_url, uploaded_object.generation) | 2608 result_url, uploaded_object.generation) |
| 2117 | 2609 |
| 2118 return (end_time - start_time, src_obj_metadata.size, result_url, | 2610 return (end_time - start_time, src_obj_metadata.size, result_url, |
| 2119 uploaded_object.md5Hash) | 2611 uploaded_object.md5Hash) |
| 2120 | 2612 |
| 2121 | 2613 |
| 2122 # pylint: disable=undefined-variable | 2614 # pylint: disable=undefined-variable |
| 2123 # pylint: disable=too-many-statements | 2615 # pylint: disable=too-many-statements |
| 2124 def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, | 2616 def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, |
| 2125 copy_exception_handler, allow_splitting=True, | 2617 copy_exception_handler, allow_splitting=True, |
| 2126 headers=None, manifest=None, gzip_exts=None, test_method=None): | 2618 headers=None, manifest=None, gzip_exts=None): |
| 2127 """Performs copy from src_url to dst_url, handling various special cases. | 2619 """Performs copy from src_url to dst_url, handling various special cases. |
| 2128 | 2620 |
| 2129 Args: | 2621 Args: |
| 2130 logger: for outputting log messages. | 2622 logger: for outputting log messages. |
| 2131 src_url: Source StorageUrl. | 2623 src_url: Source StorageUrl. |
| 2132 dst_url: Destination StorageUrl. | 2624 dst_url: Destination StorageUrl. |
| 2133 gsutil_api: gsutil Cloud API instance to use for the copy. | 2625 gsutil_api: gsutil Cloud API instance to use for the copy. |
| 2134 command_obj: command object for use in Apply in parallel composite uploads. | 2626 command_obj: command object for use in Apply in parallel composite uploads |
| 2627 and sliced object downloads. |
| 2135 copy_exception_handler: for handling copy exceptions during Apply. | 2628 copy_exception_handler: for handling copy exceptions during Apply. |
| 2136 allow_splitting: Whether to allow the file to be split into component | 2629 allow_splitting: Whether to allow the file to be split into component |
| 2137 pieces for an parallel composite upload. | 2630 pieces for an parallel composite upload or download. |
| 2138 headers: optional headers to use for the copy operation. | 2631 headers: optional headers to use for the copy operation. |
| 2139 manifest: optional manifest for tracking copy operations. | 2632 manifest: optional manifest for tracking copy operations. |
| 2140 gzip_exts: List of file extensions to gzip for uploads, if any. | 2633 gzip_exts: List of file extensions to gzip for uploads, if any. |
| 2141 test_method: optional test method for modifying files during unit tests. | |
| 2142 | 2634 |
| 2143 Returns: | 2635 Returns: |
| 2144 (elapsed_time, bytes_transferred, version-specific dst_url) excluding | 2636 (elapsed_time, bytes_transferred, version-specific dst_url) excluding |
| 2145 overhead like initial GET. | 2637 overhead like initial GET. |
| 2146 | 2638 |
| 2147 Raises: | 2639 Raises: |
| 2148 ItemExistsError: if no clobber flag is specified and the destination | 2640 ItemExistsError: if no clobber flag is specified and the destination |
| 2149 object already exists. | 2641 object already exists. |
| 2150 SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified | 2642 SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified |
| 2151 and the source is an unsupported type. | 2643 and the source is an unsupported type. |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 2187 if global_copy_helper_opts.preserve_acl: | 2679 if global_copy_helper_opts.preserve_acl: |
| 2188 src_obj_fields.append('acl') | 2680 src_obj_fields.append('acl') |
| 2189 if (src_url.scheme == dst_url.scheme | 2681 if (src_url.scheme == dst_url.scheme |
| 2190 and not global_copy_helper_opts.daisy_chain): | 2682 and not global_copy_helper_opts.daisy_chain): |
| 2191 copy_in_the_cloud = True | 2683 copy_in_the_cloud = True |
| 2192 else: | 2684 else: |
| 2193 copy_in_the_cloud = False | 2685 copy_in_the_cloud = False |
| 2194 else: | 2686 else: |
| 2195 # Just get the fields needed to validate the download. | 2687 # Just get the fields needed to validate the download. |
| 2196 src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag', | 2688 src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag', |
| 2197 'mediaLink', 'md5Hash', 'size'] | 2689 'mediaLink', 'md5Hash', 'size', 'generation'] |
| 2198 | 2690 |
| 2199 if (src_url.scheme == 's3' and | 2691 if (src_url.scheme == 's3' and |
| 2200 global_copy_helper_opts.skip_unsupported_objects): | 2692 global_copy_helper_opts.skip_unsupported_objects): |
| 2201 src_obj_fields.append('storageClass') | 2693 src_obj_fields.append('storageClass') |
| 2202 | 2694 |
| 2203 try: | 2695 try: |
| 2204 src_generation = GenerationFromUrlAndString(src_url, src_url.generation) | 2696 src_generation = GenerationFromUrlAndString(src_url, src_url.generation) |
| 2205 src_obj_metadata = gsutil_api.GetObjectMetadata( | 2697 src_obj_metadata = gsutil_api.GetObjectMetadata( |
| 2206 src_url.bucket_name, src_url.object_name, | 2698 src_url.bucket_name, src_url.object_name, |
| 2207 generation=src_generation, provider=src_url.scheme, | 2699 generation=src_generation, provider=src_url.scheme, |
| (...skipping 16 matching lines...) Expand all Loading... |
| 2224 # dst_url will be verified in _CopyObjToObjDaisyChainMode if it | 2716 # dst_url will be verified in _CopyObjToObjDaisyChainMode if it |
| 2225 # is not s3 (and thus differs from src_url). | 2717 # is not s3 (and thus differs from src_url). |
| 2226 if src_url.scheme == 's3': | 2718 if src_url.scheme == 's3': |
| 2227 acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata) | 2719 acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata) |
| 2228 if acl_text: | 2720 if acl_text: |
| 2229 AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text) | 2721 AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text) |
| 2230 else: | 2722 else: |
| 2231 try: | 2723 try: |
| 2232 src_obj_filestream = GetStreamFromFileUrl(src_url) | 2724 src_obj_filestream = GetStreamFromFileUrl(src_url) |
| 2233 except Exception, e: # pylint: disable=broad-except | 2725 except Exception, e: # pylint: disable=broad-except |
| 2234 raise CommandException('Error opening file "%s": %s.' % (src_url, | 2726 if command_obj.continue_on_error: |
| 2235 e.message)) | 2727 message = 'Error copying %s: %s' % (src_url, str(e)) |
| 2728 command_obj.op_failure_count += 1 |
| 2729 logger.error(message) |
| 2730 return |
| 2731 else: |
| 2732 raise CommandException('Error opening file "%s": %s.' % (src_url, |
| 2733 e.message)) |
| 2236 if src_url.IsStream(): | 2734 if src_url.IsStream(): |
| 2237 src_obj_size = None | 2735 src_obj_size = None |
| 2238 else: | 2736 else: |
| 2239 src_obj_size = os.path.getsize(src_url.object_name) | 2737 src_obj_size = os.path.getsize(src_url.object_name) |
| 2240 | 2738 |
| 2241 if global_copy_helper_opts.use_manifest: | 2739 if global_copy_helper_opts.use_manifest: |
| 2242 # Set the source size in the manifest. | 2740 # Set the source size in the manifest. |
| 2243 manifest.Set(src_url.url_string, 'size', src_obj_size) | 2741 manifest.Set(src_url.url_string, 'size', src_obj_size) |
| 2244 | 2742 |
| 2245 if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE | 2743 if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 2301 _SetContentTypeFromFile(src_url, dst_obj_metadata) | 2799 _SetContentTypeFromFile(src_url, dst_obj_metadata) |
| 2302 else: | 2800 else: |
| 2303 # Files don't have Cloud API metadata. | 2801 # Files don't have Cloud API metadata. |
| 2304 dst_obj_metadata = None | 2802 dst_obj_metadata = None |
| 2305 | 2803 |
| 2306 _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata) | 2804 _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata) |
| 2307 | 2805 |
| 2308 if src_url.IsCloudUrl(): | 2806 if src_url.IsCloudUrl(): |
| 2309 if dst_url.IsFileUrl(): | 2807 if dst_url.IsFileUrl(): |
| 2310 return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, | 2808 return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
| 2311 gsutil_api, logger, test_method=test_method) | 2809 gsutil_api, logger, command_obj, |
| 2810 copy_exception_handler, |
| 2811 allow_splitting=allow_splitting) |
| 2312 elif copy_in_the_cloud: | 2812 elif copy_in_the_cloud: |
| 2313 return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, | 2813 return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, |
| 2314 dst_obj_metadata, preconditions, | 2814 dst_obj_metadata, preconditions, |
| 2315 gsutil_api, logger) | 2815 gsutil_api, logger) |
| 2316 else: | 2816 else: |
| 2317 return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, | 2817 return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, |
| 2318 dst_url, dst_obj_metadata, | 2818 dst_url, dst_obj_metadata, |
| 2319 preconditions, gsutil_api, logger) | 2819 preconditions, gsutil_api, logger) |
| 2320 else: # src_url.IsFileUrl() | 2820 else: # src_url.IsFileUrl() |
| 2321 if dst_url.IsCloudUrl(): | 2821 if dst_url.IsCloudUrl(): |
| (...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 2492 if past_scheme.find(sep) == -1: | 2992 if past_scheme.find(sep) == -1: |
| 2493 return 'file://' | 2993 return 'file://' |
| 2494 else: | 2994 else: |
| 2495 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] | 2995 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] |
| 2496 if url.IsBucket(): | 2996 if url.IsBucket(): |
| 2497 return '%s://' % url.scheme | 2997 return '%s://' % url.scheme |
| 2498 # Else it names a bucket subdir. | 2998 # Else it names a bucket subdir. |
| 2499 return url.url_string.rstrip(sep).rpartition(sep)[0] | 2999 return url.url_string.rstrip(sep).rpartition(sep)[0] |
| 2500 | 3000 |
| 2501 | 3001 |
| 2502 def _DivideAndCeil(dividend, divisor): | |
| 2503 """Returns ceil(dividend / divisor). | |
| 2504 | |
| 2505 Takes care to avoid the pitfalls of floating point arithmetic that could | |
| 2506 otherwise yield the wrong result for large numbers. | |
| 2507 | |
| 2508 Args: | |
| 2509 dividend: Dividend for the operation. | |
| 2510 divisor: Divisor for the operation. | |
| 2511 | |
| 2512 Returns: | |
| 2513 Quotient. | |
| 2514 """ | |
| 2515 quotient = dividend // divisor | |
| 2516 if (dividend % divisor) != 0: | |
| 2517 quotient += 1 | |
| 2518 return quotient | |
| 2519 | |
| 2520 | |
| 2521 def _GetPartitionInfo(file_size, max_components, default_component_size): | 3002 def _GetPartitionInfo(file_size, max_components, default_component_size): |
| 2522 """Gets info about a file partition for parallel composite uploads. | 3003 """Gets info about a file partition for parallel file/object transfers. |
| 2523 | 3004 |
| 2524 Args: | 3005 Args: |
| 2525 file_size: The number of bytes in the file to be partitioned. | 3006 file_size: The number of bytes in the file to be partitioned. |
| 2526 max_components: The maximum number of components that can be composed. | 3007 max_components: The maximum number of components that can be composed. |
| 2527 default_component_size: The size of a component, assuming that | 3008 default_component_size: The size of a component, assuming that |
| 2528 max_components is infinite. | 3009 max_components is infinite. |
| 2529 Returns: | 3010 Returns: |
| 2530 The number of components in the partitioned file, and the size of each | 3011 The number of components in the partitioned file, and the size of each |
| 2531 component (except the last, which will have a different size iff | 3012 component (except the last, which will have a different size iff |
| 2532 file_size != 0 (mod num_components)). | 3013 file_size != 0 (mod num_components)). |
| 2533 """ | 3014 """ |
| 2534 # num_components = ceil(file_size / default_component_size) | 3015 # num_components = ceil(file_size / default_component_size) |
| 2535 num_components = _DivideAndCeil(file_size, default_component_size) | 3016 num_components = DivideAndCeil(file_size, default_component_size) |
| 2536 | 3017 |
| 2537 # num_components must be in the range [2, max_components] | 3018 # num_components must be in the range [2, max_components] |
| 2538 num_components = max(min(num_components, max_components), 2) | 3019 num_components = max(min(num_components, max_components), 2) |
| 2539 | 3020 |
| 2540 # component_size = ceil(file_size / num_components) | 3021 # component_size = ceil(file_size / num_components) |
| 2541 component_size = _DivideAndCeil(file_size, num_components) | 3022 component_size = DivideAndCeil(file_size, num_components) |
| 2542 return (num_components, component_size) | 3023 return (num_components, component_size) |
| 2543 | 3024 |
| 2544 | 3025 |
| 2545 def _DeleteObjectFn(cls, url_to_delete, thread_state=None): | 3026 def _DeleteTempComponentObjectFn(cls, url_to_delete, thread_state=None): |
| 2546 """Wrapper function to be used with command.Apply().""" | 3027 """Wrapper func to be used with command.Apply to delete temporary objects.""" |
| 2547 gsutil_api = GetCloudApiInstance(cls, thread_state) | 3028 gsutil_api = GetCloudApiInstance(cls, thread_state) |
| 2548 gsutil_api.DeleteObject( | 3029 try: |
| 2549 url_to_delete.bucket_name, url_to_delete.object_name, | 3030 gsutil_api.DeleteObject( |
| 2550 generation=url_to_delete.generation, provider=url_to_delete.scheme) | 3031 url_to_delete.bucket_name, url_to_delete.object_name, |
| 3032 generation=url_to_delete.generation, provider=url_to_delete.scheme) |
| 3033 except NotFoundException: |
| 3034 # The temporary object could already be gone if a retry was |
| 3035 # issued at a lower layer but the original request succeeded. |
| 3036 # Barring other errors, the top-level command should still report success, |
| 3037 # so don't raise here. |
| 3038 pass |
| 2551 | 3039 |
| 2552 | 3040 |
| 2553 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock): | 3041 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock): |
| 2554 """Parse the tracker file from the last parallel composite upload attempt. | 3042 """Parse the tracker file from the last parallel composite upload attempt. |
| 2555 | 3043 |
| 2556 If it exists, the tracker file is of the format described in | 3044 If it exists, the tracker file is of the format described in |
| 2557 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be | 3045 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be |
| 2558 read, then the upload will start from the beginning. | 3046 read, then the upload will start from the beginning. |
| 2559 | 3047 |
| 2560 Args: | 3048 Args: |
| (...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 2778 url.generation = tracker_object.generation | 3266 url.generation = tracker_object.generation |
| 2779 uploaded_components.append(url) | 3267 uploaded_components.append(url) |
| 2780 objects_already_chosen.append(tracker_object.object_name) | 3268 objects_already_chosen.append(tracker_object.object_name) |
| 2781 | 3269 |
| 2782 if uploaded_components: | 3270 if uploaded_components: |
| 2783 logging.info('Found %d existing temporary components to reuse.', | 3271 logging.info('Found %d existing temporary components to reuse.', |
| 2784 len(uploaded_components)) | 3272 len(uploaded_components)) |
| 2785 | 3273 |
| 2786 return (components_to_upload, uploaded_components, | 3274 return (components_to_upload, uploaded_components, |
| 2787 existing_objects_to_delete) | 3275 existing_objects_to_delete) |
| OLD | NEW |