| Index: third_party/gsutil/gslib/copy_helper.py
|
| diff --git a/third_party/gsutil/gslib/copy_helper.py b/third_party/gsutil/gslib/copy_helper.py
|
| index 456b15cf47a011e3c5b71a426c46dffeb520a833..aea4195bd028749f71ca361e5fd8a7dda37a9070 100644
|
| --- a/third_party/gsutil/gslib/copy_helper.py
|
| +++ b/third_party/gsutil/gslib/copy_helper.py
|
| @@ -27,7 +27,7 @@ from hashlib import md5
|
| import json
|
| import logging
|
| import mimetypes
|
| -import multiprocessing
|
| +from operator import attrgetter
|
| import os
|
| import pickle
|
| import random
|
| @@ -53,10 +53,13 @@ from gslib.cloud_api import ResumableDownloadException
|
| from gslib.cloud_api import ResumableUploadAbortException
|
| from gslib.cloud_api import ResumableUploadException
|
| from gslib.cloud_api import ResumableUploadStartOverException
|
| -from gslib.cloud_api_helper import GetDownloadSerializationDict
|
| +from gslib.cloud_api_helper import GetDownloadSerializationData
|
| from gslib.commands.compose import MAX_COMPOSE_ARITY
|
| from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE
|
| from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD
|
| +from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE
|
| +from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS
|
| +from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD
|
| from gslib.cs_api_map import ApiSelector
|
| from gslib.daisy_chain_wrapper import DaisyChainWrapper
|
| from gslib.exception import CommandException
|
| @@ -65,11 +68,13 @@ from gslib.file_part import FilePart
|
| from gslib.hashing_helper import Base64EncodeHash
|
| from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
|
| from gslib.hashing_helper import CalculateHashesFromContents
|
| +from gslib.hashing_helper import CHECK_HASH_IF_FAST_ELSE_FAIL
|
| +from gslib.hashing_helper import CHECK_HASH_NEVER
|
| +from gslib.hashing_helper import ConcatCrc32c
|
| from gslib.hashing_helper import GetDownloadHashAlgs
|
| from gslib.hashing_helper import GetUploadHashAlgs
|
| from gslib.hashing_helper import HashingFileUploadWrapper
|
| -from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
|
| -from gslib.parallelism_framework_util import ThreadSafeDict
|
| +from gslib.parallelism_framework_util import AtomicDict
|
| from gslib.progress_callback import ConstructAnnounceText
|
| from gslib.progress_callback import FileProgressCallbackHandler
|
| from gslib.progress_callback import ProgressCallbackWithBackoff
|
| @@ -77,11 +82,13 @@ from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper
|
| from gslib.storage_url import ContainsWildcard
|
| from gslib.storage_url import StorageUrlFromString
|
| from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
|
| +from gslib.tracker_file import DeleteDownloadTrackerFiles
|
| from gslib.tracker_file import DeleteTrackerFile
|
| from gslib.tracker_file import GetTrackerFilePath
|
| from gslib.tracker_file import RaiseUnwritableTrackerFileException
|
| from gslib.tracker_file import ReadOrCreateDownloadTrackerFile
|
| from gslib.tracker_file import TrackerFileType
|
| +from gslib.tracker_file import WriteDownloadComponentTrackerFile
|
| from gslib.translation_helper import AddS3MarkerAclToObjectMetadata
|
| from gslib.translation_helper import CopyObjectMetadata
|
| from gslib.translation_helper import DEFAULT_CONTENT_TYPE
|
| @@ -89,8 +96,11 @@ from gslib.translation_helper import GenerationFromUrlAndString
|
| from gslib.translation_helper import ObjectMetadataFromHeaders
|
| from gslib.translation_helper import PreconditionsFromHeaders
|
| from gslib.translation_helper import S3MarkerAclFromObjectMetadata
|
| +from gslib.util import CheckFreeSpace
|
| +from gslib.util import CheckMultiprocessingAvailableAndInit
|
| from gslib.util import CreateLock
|
| from gslib.util import DEFAULT_FILE_BUFFER_SIZE
|
| +from gslib.util import DivideAndCeil
|
| from gslib.util import GetCloudApiInstance
|
| from gslib.util import GetFileSize
|
| from gslib.util import GetJsonResumableChunkSize
|
| @@ -102,39 +112,34 @@ from gslib.util import IS_WINDOWS
|
| from gslib.util import IsCloudSubdirPlaceholder
|
| from gslib.util import MakeHumanReadable
|
| from gslib.util import MIN_SIZE_COMPUTE_LOGGING
|
| -from gslib.util import MultiprocessingIsAvailable
|
| from gslib.util import ResumableThreshold
|
| from gslib.util import TEN_MIB
|
| +from gslib.util import UsingCrcmodExtension
|
| from gslib.util import UTF8
|
| from gslib.wildcard_iterator import CreateWildcardIterator
|
|
|
| # pylint: disable=g-import-not-at-top
|
| if IS_WINDOWS:
|
| import msvcrt
|
| - from ctypes import c_int
|
| - from ctypes import c_uint64
|
| - from ctypes import c_char_p
|
| - from ctypes import c_wchar_p
|
| - from ctypes import windll
|
| - from ctypes import POINTER
|
| - from ctypes import WINFUNCTYPE
|
| - from ctypes import WinError
|
|
|
| # Declare copy_helper_opts as a global because namedtuple isn't aware of
|
| # assigning to a class member (which breaks pickling done by multiprocessing).
|
| # For details see
|
| # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instance-correctly
|
| -# Similarly can't pickle logger.
|
| # pylint: disable=global-at-module-level
|
| -global global_copy_helper_opts, global_logger
|
| +global global_copy_helper_opts
|
|
|
| # In-memory map of local files that are currently opened for write. Used to
|
| # ensure that if we write to the same file twice (say, for example, because the
|
| # user specified two identical source URLs), the writes occur serially.
|
| -global open_files_map
|
| +global open_files_map, open_files_lock
|
| open_files_map = (
|
| - ThreadSafeDict() if (IS_WINDOWS or not MultiprocessingIsAvailable()[0])
|
| - else ThreadAndProcessSafeDict(multiprocessing.Manager()))
|
| + AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available
|
| + else AtomicDict(manager=gslib.util.manager))
|
| +
|
| +# We don't allow multiple processes on Windows, so using a process-safe lock
|
| +# would be unnecessary.
|
| +open_files_lock = CreateLock()
|
|
|
| # For debugging purposes; if True, files and objects that fail hash validation
|
| # will be saved with the below suffix appended.
|
| @@ -178,6 +183,15 @@ PerformParallelUploadFileToObjectArgs = namedtuple(
|
| 'filename file_start file_length src_url dst_url canned_acl '
|
| 'content_type tracker_file tracker_file_lock')
|
|
|
| +PerformSlicedDownloadObjectToFileArgs = namedtuple(
|
| + 'PerformSlicedDownloadObjectToFileArgs',
|
| + 'component_num src_url src_obj_metadata dst_url download_file_name '
|
| + 'start_byte end_byte')
|
| +
|
| +PerformSlicedDownloadReturnValues = namedtuple(
|
| + 'PerformSlicedDownloadReturnValues',
|
| + 'component_num crc32c bytes_transferred server_encoding')
|
| +
|
| ObjectFromTracker = namedtuple('ObjectFromTracker',
|
| 'object_name generation')
|
|
|
| @@ -188,13 +202,23 @@ ObjectFromTracker = namedtuple('ObjectFromTracker',
|
| # Chunk size to use while zipping/unzipping gzip files.
|
| GZIP_CHUNK_SIZE = 8192
|
|
|
| +# Number of bytes to wait before updating a sliced download component tracker
|
| +# file.
|
| +TRACKERFILE_UPDATE_THRESHOLD = TEN_MIB
|
| +
|
| PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024
|
|
|
| # S3 requires special Multipart upload logic (that we currently don't implement)
|
| # for files > 5GiB in size.
|
| S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024
|
|
|
| -suggested_parallel_composites = False
|
| +# TODO: Create a multiprocessing framework value allocator, then use it instead
|
| +# of a dict.
|
| +global suggested_sliced_transfers, suggested_sliced_transfers_lock
|
| +suggested_sliced_transfers = (
|
| + AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available
|
| + else AtomicDict(manager=gslib.util.manager))
|
| +suggested_sliced_transfers_lock = CreateLock()
|
|
|
|
|
| class FileConcurrencySkipError(Exception):
|
| @@ -206,7 +230,7 @@ def _RmExceptionHandler(cls, e):
|
| cls.logger.error(str(e))
|
|
|
|
|
| -def _ParallelUploadCopyExceptionHandler(cls, e):
|
| +def _ParallelCopyExceptionHandler(cls, e):
|
| """Simple exception handler to allow post-completion status."""
|
| cls.logger.error(str(e))
|
| cls.op_failure_count += 1
|
| @@ -248,7 +272,7 @@ def _PerformParallelUploadFileToObject(cls, args, thread_state=None):
|
| ret = _UploadFileToObject(args.src_url, fp, args.file_length,
|
| args.dst_url, dst_object_metadata,
|
| preconditions, gsutil_api, cls.logger, cls,
|
| - _ParallelUploadCopyExceptionHandler,
|
| + _ParallelCopyExceptionHandler,
|
| gzip_exts=None, allow_splitting=False)
|
| finally:
|
| if global_copy_helper_opts.canned_acl:
|
| @@ -641,23 +665,23 @@ def _CreateDigestsFromDigesters(digesters):
|
| return digests
|
|
|
|
|
| -def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata):
|
| +def _CreateDigestsFromLocalFile(logger, algs, file_name, final_file_name,
|
| + src_obj_metadata):
|
| """Creates a base64 CRC32C and/or MD5 digest from file_name.
|
|
|
| Args:
|
| - logger: for outputting log messages.
|
| - algs: list of algorithms to compute.
|
| - file_name: file to digest.
|
| - src_obj_metadata: metadta of source object.
|
| + logger: For outputting log messages.
|
| + algs: List of algorithms to compute.
|
| + file_name: File to digest.
|
| + final_file_name: Permanent location to be used for the downloaded file
|
| + after validation (used for logging).
|
| + src_obj_metadata: Metadata of source object.
|
|
|
| Returns:
|
| Dict of algorithm name : base 64 encoded digest
|
| """
|
| hash_dict = {}
|
| if 'md5' in algs:
|
| - if src_obj_metadata.size and src_obj_metadata.size > TEN_MIB:
|
| - logger.info(
|
| - 'Computing MD5 for %s...', file_name)
|
| hash_dict['md5'] = md5()
|
| if 'crc32c' in algs:
|
| hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c')
|
| @@ -666,7 +690,8 @@ def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata):
|
| fp, hash_dict, ProgressCallbackWithBackoff(
|
| src_obj_metadata.size,
|
| FileProgressCallbackHandler(
|
| - ConstructAnnounceText('Hashing', file_name), logger).call))
|
| + ConstructAnnounceText('Hashing', final_file_name),
|
| + logger).call))
|
| digests = {}
|
| for alg_name, digest in hash_dict.iteritems():
|
| digests[alg_name] = Base64EncodeHash(digest.hexdigest())
|
| @@ -730,7 +755,8 @@ def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests,
|
| logger: for outputting log messages.
|
| obj_url: CloudUrl for cloud object.
|
| obj_metadata: Cloud Object being downloaded from or uploaded to.
|
| - file_name: Local file name on disk being downloaded to or uploaded from.
|
| + file_name: Local file name on disk being downloaded to or uploaded from
|
| + (used only for logging).
|
| digests: Computed Digests for the object.
|
| is_upload: If true, comparing for an uploaded object (controls logging).
|
|
|
| @@ -985,9 +1011,10 @@ def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata,
|
| # those that were uploaded by a previous, failed run and have since
|
| # changed (but still have an old generation lying around).
|
| objects_to_delete = components + existing_objects_to_delete
|
| - command_obj.Apply(_DeleteObjectFn, objects_to_delete, _RmExceptionHandler,
|
| - arg_checker=gslib.command.DummyArgChecker,
|
| - parallel_operations_override=True)
|
| + command_obj.Apply(
|
| + _DeleteTempComponentObjectFn, objects_to_delete, _RmExceptionHandler,
|
| + arg_checker=gslib.command.DummyArgChecker,
|
| + parallel_operations_override=True)
|
| except Exception: # pylint: disable=broad-except
|
| # If some of the delete calls fail, don't cause the whole command to
|
| # fail. The copy was successful iff the compose call succeeded, so
|
| @@ -1025,7 +1052,7 @@ def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url,
|
| Returns:
|
| True iff a parallel upload should be performed on the source file.
|
| """
|
| - global suggested_parallel_composites
|
| + global suggested_slice_transfers, suggested_sliced_transfers_lock
|
| parallel_composite_upload_threshold = HumanReadableToBytes(config.get(
|
| 'GSUtil', 'parallel_composite_upload_threshold',
|
| DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD))
|
| @@ -1041,17 +1068,18 @@ def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url,
|
| # TODO: Once compiled crcmod is being distributed by major Linux distributions
|
| # remove this check.
|
| if (all_factors_but_size and parallel_composite_upload_threshold == 0
|
| - and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD
|
| - and not suggested_parallel_composites):
|
| - logger.info('\n'.join(textwrap.wrap(
|
| - '==> NOTE: You are uploading one or more large file(s), which would '
|
| - 'run significantly faster if you enable parallel composite uploads. '
|
| - 'This feature can be enabled by editing the '
|
| - '"parallel_composite_upload_threshold" value in your .boto '
|
| - 'configuration file. However, note that if you do this you and any '
|
| - 'users that download such composite files will need to have a compiled '
|
| - 'crcmod installed (see "gsutil help crcmod").')) + '\n')
|
| - suggested_parallel_composites = True
|
| + and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD):
|
| + with suggested_sliced_transfers_lock:
|
| + if not suggested_sliced_transfers.get('suggested'):
|
| + logger.info('\n'.join(textwrap.wrap(
|
| + '==> NOTE: You are uploading one or more large file(s), which '
|
| + 'would run significantly faster if you enable parallel composite '
|
| + 'uploads. This feature can be enabled by editing the '
|
| + '"parallel_composite_upload_threshold" value in your .boto '
|
| + 'configuration file. However, note that if you do this you and any '
|
| + 'users that download such composite files will need to have a '
|
| + 'compiled crcmod installed (see "gsutil help crcmod").')) + '\n')
|
| + suggested_sliced_transfers['suggested'] = True
|
|
|
| return (all_factors_but_size
|
| and parallel_composite_upload_threshold > 0
|
| @@ -1104,34 +1132,45 @@ def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id,
|
| if storage_url.IsBucket():
|
| return (storage_url, True)
|
|
|
| - # For object/prefix URLs check 3 cases: (a) if the name ends with '/' treat
|
| - # as a subdir; otherwise, use the wildcard iterator with url to
|
| - # find if (b) there's a Prefix matching url, or (c) name is of form
|
| - # dir_$folder$ (and in both these cases also treat dir as a subdir).
|
| - # Cloud subdirs are always considered to be an existing container.
|
| - if IsCloudSubdirPlaceholder(storage_url):
|
| - return (storage_url, True)
|
| + # For object/prefix URLs, there are four cases that indicate the destination
|
| + # is a cloud subdirectory; these are always considered to be an existing
|
| + # container. Checking each case allows gsutil to provide Unix-like
|
| + # destination folder semantics, but requires up to three HTTP calls, noted
|
| + # below.
|
|
|
| - # Check for the special case where we have a folder marker object.
|
| - folder_expansion = CreateWildcardIterator(
|
| - storage_url.versionless_url_string + '_$folder$', gsutil_api,
|
| - debug=debug, project_id=project_id).IterAll(
|
| - bucket_listing_fields=['name'])
|
| - for blr in folder_expansion:
|
| + # Case 1: If a placeholder object ending with '/' exists.
|
| + if IsCloudSubdirPlaceholder(storage_url):
|
| return (storage_url, True)
|
|
|
| - blr_expansion = CreateWildcardIterator(url_str, gsutil_api,
|
| - debug=debug,
|
| - project_id=project_id).IterAll(
|
| - bucket_listing_fields=['name'])
|
| + # HTTP call to make an eventually consistent check for a matching prefix,
|
| + # _$folder$, or empty listing.
|
| expansion_empty = True
|
| - for blr in blr_expansion:
|
| + list_iterator = gsutil_api.ListObjects(
|
| + storage_url.bucket_name, prefix=storage_url.object_name, delimiter='/',
|
| + provider=storage_url.scheme, fields=['prefixes', 'items/name'])
|
| + for obj_or_prefix in list_iterator:
|
| + # To conserve HTTP calls for the common case, we make a single listing
|
| + # that covers prefixes and object names. Listing object names covers the
|
| + # _$folder$ case and the nonexistent-object-as-subdir case. However, if
|
| + # there are many existing objects for which the target URL is an exact
|
| + # prefix, this listing could be paginated and span multiple HTTP calls.
|
| + # If this case becomes common, we could heurestically abort the
|
| + # listing operation after the first page of results and just query for the
|
| + # _$folder$ object directly using GetObjectMetadata.
|
| expansion_empty = False
|
| - if blr.IsPrefix():
|
| +
|
| + if obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.PREFIX:
|
| + # Case 2: If there is a matching prefix when listing the destination URL.
|
| + return (storage_url, True)
|
| + elif (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT and
|
| + obj_or_prefix.data.name == storage_url.object_name + '_$folder$'):
|
| + # Case 3: If a placeholder object matching destination + _$folder$
|
| + # exists.
|
| return (storage_url, True)
|
|
|
| - return (storage_url,
|
| - expansion_empty and treat_nonexistent_object_as_subdir)
|
| + # Case 4: If no objects/prefixes matched, and nonexistent objects should be
|
| + # treated as subdirectories.
|
| + return (storage_url, expansion_empty and treat_nonexistent_object_as_subdir)
|
|
|
|
|
| def FixWindowsNaming(src_url, dst_url):
|
| @@ -1242,46 +1281,6 @@ def _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
|
| dst_obj.md5Hash)
|
|
|
|
|
| -def _CheckFreeSpace(path):
|
| - """Return path/drive free space (in bytes)."""
|
| - if IS_WINDOWS:
|
| - # pylint: disable=g-import-not-at-top
|
| - try:
|
| - # pylint: disable=invalid-name
|
| - get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p,
|
| - POINTER(c_uint64),
|
| - POINTER(c_uint64),
|
| - POINTER(c_uint64))
|
| - get_disk_free_space_ex = get_disk_free_space_ex(
|
| - ('GetDiskFreeSpaceExW', windll.kernel32), (
|
| - (1, 'lpszPathName'),
|
| - (2, 'lpFreeUserSpace'),
|
| - (2, 'lpTotalSpace'),
|
| - (2, 'lpFreeSpace'),))
|
| - except AttributeError:
|
| - get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p,
|
| - POINTER(c_uint64),
|
| - POINTER(c_uint64),
|
| - POINTER(c_uint64))
|
| - get_disk_free_space_ex = get_disk_free_space_ex(
|
| - ('GetDiskFreeSpaceExA', windll.kernel32), (
|
| - (1, 'lpszPathName'),
|
| - (2, 'lpFreeUserSpace'),
|
| - (2, 'lpTotalSpace'),
|
| - (2, 'lpFreeSpace'),))
|
| -
|
| - def GetDiskFreeSpaceExErrCheck(result, unused_func, args):
|
| - if not result:
|
| - raise WinError()
|
| - return args[1].value
|
| - get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck
|
| -
|
| - return get_disk_free_space_ex(os.getenv('SystemDrive'))
|
| - else:
|
| - (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path)
|
| - return f_frsize * f_bavail
|
| -
|
| -
|
| def _SetContentTypeFromFile(src_url, dst_obj_metadata):
|
| """Detects and sets Content-Type if src_url names a local file.
|
|
|
| @@ -1509,7 +1508,7 @@ def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger):
|
| # Check for temp space. Assume the compressed object is at most 2x
|
| # the size of the object (normally should compress to smaller than
|
| # the object)
|
| - if _CheckFreeSpace(gzip_path) < 2*int(src_obj_size):
|
| + if CheckFreeSpace(gzip_path) < 2*int(src_obj_size):
|
| raise CommandException('Inadequate temp space available to compress '
|
| '%s. See the CHANGING TEMP DIRECTORIES section '
|
| 'of "gsutil help cp" for more info.' % src_url)
|
| @@ -1657,30 +1656,26 @@ def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size,
|
| uploaded_object.md5Hash)
|
|
|
|
|
| -# TODO: Refactor this long function into smaller pieces.
|
| -# pylint: disable=too-many-statements
|
| -def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
|
| - gsutil_api, logger, test_method=None):
|
| - """Downloads an object to a local file.
|
| +def _GetDownloadFile(dst_url, src_obj_metadata, logger):
|
| + """Creates a new download file, and deletes the file that will be replaced.
|
| +
|
| + Names and creates a temporary file for this download. Also, if there is an
|
| + existing file at the path where this file will be placed after the download
|
| + is completed, that file will be deleted.
|
|
|
| Args:
|
| - src_url: Source CloudUrl.
|
| - src_obj_metadata: Metadata from the source object.
|
| dst_url: Destination FileUrl.
|
| - gsutil_api: gsutil Cloud API instance to use for the download.
|
| + src_obj_metadata: Metadata from the source object.
|
| logger: for outputting log messages.
|
| - test_method: Optional test method for modifying the file before validation
|
| - during unit tests.
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, dst_url, md5), excluding overhead like
|
| - initial GET.
|
|
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| + Returns:
|
| + (download_file_name, need_to_unzip)
|
| + download_file_name: The name of the temporary file to which the object will
|
| + be downloaded.
|
| + need_to_unzip: If true, a temporary zip file was used and must be
|
| + uncompressed as part of validation.
|
| """
|
| - global open_files_map
|
| - file_name = dst_url.object_name
|
| - dir_name = os.path.dirname(file_name)
|
| + dir_name = os.path.dirname(dst_url.object_name)
|
| if dir_name and not os.path.exists(dir_name):
|
| # Do dir creation in try block so can ignore case where dir already
|
| # exists. This is needed to avoid a race condition when running gsutil
|
| @@ -1690,120 +1685,489 @@ def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
|
| except OSError, e:
|
| if e.errno != errno.EEXIST:
|
| raise
|
| - api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
|
| +
|
| + need_to_unzip = False
|
| # For gzipped objects download to a temp file and unzip. For the XML API,
|
| - # the represents the result of a HEAD request. For the JSON API, this is
|
| + # this represents the result of a HEAD request. For the JSON API, this is
|
| # the stored encoding which the service may not respect. However, if the
|
| # server sends decompressed bytes for a file that is stored compressed
|
| # (double compressed case), there is no way we can validate the hash and
|
| # we will fail our hash check for the object.
|
| if (src_obj_metadata.contentEncoding and
|
| src_obj_metadata.contentEncoding.lower().endswith('gzip')):
|
| - # We can't use tempfile.mkstemp() here because we need a predictable
|
| - # filename for resumable downloads.
|
| - download_file_name = _GetDownloadZipFileName(file_name)
|
| + need_to_unzip = True
|
| + download_file_name = _GetDownloadTempZipFileName(dst_url)
|
| logger.info(
|
| 'Downloading to temp gzip filename %s', download_file_name)
|
| - need_to_unzip = True
|
| else:
|
| - download_file_name = file_name
|
| - need_to_unzip = False
|
| + download_file_name = _GetDownloadTempFileName(dst_url)
|
|
|
| - if download_file_name.endswith(dst_url.delim):
|
| - logger.warn('\n'.join(textwrap.wrap(
|
| - 'Skipping attempt to download to filename ending with slash (%s). This '
|
| - 'typically happens when using gsutil to download from a subdirectory '
|
| - 'created by the Cloud Console (https://cloud.google.com/console)'
|
| - % download_file_name)))
|
| - return (0, 0, dst_url, '')
|
| + # If a file exists at the permanent destination (where the file will be moved
|
| + # after the download is completed), delete it here to reduce disk space
|
| + # requirements.
|
| + if os.path.exists(dst_url.object_name):
|
| + os.unlink(dst_url.object_name)
|
|
|
| - # Set up hash digesters.
|
| + # Downloads open the temporary download file in r+b mode, which requires it
|
| + # to already exist, so we create it here if it doesn't exist already.
|
| + fp = open(download_file_name, 'ab')
|
| + fp.close()
|
| + return download_file_name, need_to_unzip
|
| +
|
| +
|
| +def _ShouldDoSlicedDownload(download_strategy, src_obj_metadata,
|
| + allow_splitting, logger):
|
| + """Determines whether the sliced download strategy should be used.
|
| +
|
| + Args:
|
| + download_strategy: CloudApi download strategy.
|
| + src_obj_metadata: Metadata from the source object.
|
| + allow_splitting: If false, then this function returns false.
|
| + logger: logging.Logger for log message output.
|
| +
|
| + Returns:
|
| + True iff a sliced download should be performed on the source file.
|
| + """
|
| + sliced_object_download_threshold = HumanReadableToBytes(config.get(
|
| + 'GSUtil', 'sliced_object_download_threshold',
|
| + DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD))
|
| +
|
| + max_components = config.getint(
|
| + 'GSUtil', 'sliced_object_download_max_components',
|
| + DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS)
|
| +
|
| + # Don't use sliced download if it will prevent us from performing an
|
| + # integrity check.
|
| + check_hashes_config = config.get(
|
| + 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
|
| + parallel_hashing = src_obj_metadata.crc32c and UsingCrcmodExtension(crcmod)
|
| + hashing_okay = parallel_hashing or check_hashes_config == CHECK_HASH_NEVER
|
| +
|
| + use_slice = (
|
| + allow_splitting
|
| + and download_strategy is not CloudApi.DownloadStrategy.ONE_SHOT
|
| + and max_components > 1
|
| + and hashing_okay
|
| + and sliced_object_download_threshold > 0
|
| + and src_obj_metadata.size >= sliced_object_download_threshold)
|
| +
|
| + if (not use_slice
|
| + and src_obj_metadata.size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD
|
| + and not UsingCrcmodExtension(crcmod)
|
| + and check_hashes_config != CHECK_HASH_NEVER):
|
| + with suggested_sliced_transfers_lock:
|
| + if not suggested_sliced_transfers.get('suggested'):
|
| + logger.info('\n'.join(textwrap.wrap(
|
| + '==> NOTE: You are downloading one or more large file(s), which '
|
| + 'would run significantly faster if you enabled sliced object '
|
| + 'uploads. This feature is enabled by default but requires that '
|
| + 'compiled crcmod be installed (see "gsutil help crcmod").')) + '\n')
|
| + suggested_sliced_transfers['suggested'] = True
|
| +
|
| + return use_slice
|
| +
|
| +
|
| +def _PerformSlicedDownloadObjectToFile(cls, args, thread_state=None):
|
| + """Function argument to Apply for performing sliced downloads.
|
| +
|
| + Args:
|
| + cls: Calling Command class.
|
| + args: PerformSlicedDownloadObjectToFileArgs tuple describing the target.
|
| + thread_state: gsutil Cloud API instance to use for the operation.
|
| +
|
| + Returns:
|
| + PerformSlicedDownloadReturnValues named-tuple filled with:
|
| + component_num: The component number for this download.
|
| + crc32c: CRC32C hash value (integer) of the downloaded bytes.
|
| + bytes_transferred: The number of bytes transferred, potentially less
|
| + than the component size if the download was resumed.
|
| + """
|
| + gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
|
| hash_algs = GetDownloadHashAlgs(
|
| - logger, src_has_md5=src_obj_metadata.md5Hash,
|
| - src_has_crc32c=src_obj_metadata.crc32c)
|
| + cls.logger, consider_crc32c=args.src_obj_metadata.crc32c)
|
| digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
|
|
|
| - fp = None
|
| - # Tracks whether the server used a gzip encoding.
|
| - server_encoding = None
|
| - download_complete = False
|
| - download_strategy = _SelectDownloadStrategy(dst_url)
|
| - download_start_point = 0
|
| - # This is used for resuming downloads, but also for passing the mediaLink
|
| - # and size into the download for new downloads so that we can avoid
|
| - # making an extra HTTP call.
|
| - serialization_data = None
|
| - serialization_dict = GetDownloadSerializationDict(src_obj_metadata)
|
| - open_files = []
|
| + (bytes_transferred, server_encoding) = (
|
| + _DownloadObjectToFileResumable(args.src_url, args.src_obj_metadata,
|
| + args.dst_url, args.download_file_name,
|
| + gsutil_api, cls.logger, digesters,
|
| + component_num=args.component_num,
|
| + start_byte=args.start_byte,
|
| + end_byte=args.end_byte))
|
| +
|
| + crc32c_val = None
|
| + if 'crc32c' in digesters:
|
| + crc32c_val = digesters['crc32c'].crcValue
|
| + return PerformSlicedDownloadReturnValues(
|
| + args.component_num, crc32c_val, bytes_transferred, server_encoding)
|
| +
|
| +
|
| +def _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url,
|
| + download_file_name, logger,
|
| + api_selector, num_components):
|
| + """Maintains sliced download tracker files in order to permit resumability.
|
| +
|
| + Reads or creates a sliced download tracker file representing this object
|
| + download. Upon an attempt at cross-process resumption, the contents of the
|
| + sliced download tracker file are verified to make sure a resumption is
|
| + possible and appropriate. In the case that a resumption should not be
|
| + attempted, existing component tracker files are deleted (to prevent child
|
| + processes from attempting resumption), and a new sliced download tracker
|
| + file is created.
|
| +
|
| + Args:
|
| + src_obj_metadata: Metadata from the source object. Must include etag and
|
| + generation.
|
| + dst_url: Destination FileUrl.
|
| + download_file_name: Temporary file name to be used for the download.
|
| + logger: for outputting log messages.
|
| + api_selector: The Cloud API implementation used.
|
| + num_components: The number of components to perform this download with.
|
| + """
|
| + assert src_obj_metadata.etag
|
| + tracker_file = None
|
| +
|
| + # Only can happen if the resumable threshold is set higher than the
|
| + # parallel transfer threshold.
|
| + if src_obj_metadata.size < ResumableThreshold():
|
| + return
|
| +
|
| + tracker_file_name = GetTrackerFilePath(dst_url,
|
| + TrackerFileType.SLICED_DOWNLOAD,
|
| + api_selector)
|
| +
|
| + # Check to see if we should attempt resuming the download.
|
| try:
|
| - if download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
|
| - fp = open(download_file_name, 'wb')
|
| - elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
|
| - # If this is a resumable download, we need to open the file for append and
|
| - # manage a tracker file.
|
| - if open_files_map.get(download_file_name, False):
|
| - # Ensure another process/thread is not already writing to this file.
|
| - raise FileConcurrencySkipError
|
| - open_files.append(download_file_name)
|
| - open_files_map[download_file_name] = True
|
| - fp = open(download_file_name, 'ab')
|
| -
|
| - resuming = ReadOrCreateDownloadTrackerFile(
|
| - src_obj_metadata, dst_url, api_selector)
|
| - if resuming:
|
| - # Find out how far along we are so we can request the appropriate
|
| - # remaining range of the object.
|
| - existing_file_size = GetFileSize(fp, position_to_eof=True)
|
| - if existing_file_size > src_obj_metadata.size:
|
| - DeleteTrackerFile(GetTrackerFilePath(
|
| - dst_url, TrackerFileType.DOWNLOAD, api_selector))
|
| - raise CommandException(
|
| - '%s is larger (%d) than %s (%d).\nDeleting tracker file, so '
|
| - 'if you re-try this download it will start from scratch' %
|
| - (download_file_name, existing_file_size, src_url.object_name,
|
| - src_obj_metadata.size))
|
| - else:
|
| - if existing_file_size == src_obj_metadata.size:
|
| - logger.info(
|
| - 'Download already complete for file %s, skipping download but '
|
| - 'will run integrity checks.', download_file_name)
|
| - download_complete = True
|
| - else:
|
| - download_start_point = existing_file_size
|
| - serialization_dict['progress'] = download_start_point
|
| - logger.info('Resuming download for %s', src_url.url_string)
|
| - # Catch up our digester with the hash data.
|
| - if existing_file_size > TEN_MIB:
|
| - for alg_name in digesters:
|
| - logger.info(
|
| - 'Catching up %s for %s', alg_name, download_file_name)
|
| - with open(download_file_name, 'rb') as hash_fp:
|
| - while True:
|
| - data = hash_fp.read(DEFAULT_FILE_BUFFER_SIZE)
|
| - if not data:
|
| - break
|
| - for alg_name in digesters:
|
| - digesters[alg_name].update(data)
|
| + fp = open(download_file_name, 'rb')
|
| + existing_file_size = GetFileSize(fp)
|
| + # A parallel resumption should be attempted only if the destination file
|
| + # size is exactly the same as the source size and the tracker file matches.
|
| + if existing_file_size == src_obj_metadata.size:
|
| + tracker_file = open(tracker_file_name, 'r')
|
| + tracker_file_data = json.load(tracker_file)
|
| + if (tracker_file_data['etag'] == src_obj_metadata.etag and
|
| + tracker_file_data['generation'] == src_obj_metadata.generation and
|
| + tracker_file_data['num_components'] == num_components):
|
| + return
|
| else:
|
| - # Starting a new download, blow away whatever is already there.
|
| - fp.truncate(0)
|
| - fp.seek(0)
|
| + tracker_file.close()
|
| + logger.warn('Sliced download tracker file doesn\'t match for '
|
| + 'download of %s. Restarting download from scratch.' %
|
| + dst_url.object_name)
|
| +
|
| + except (IOError, ValueError) as e:
|
| + # Ignore non-existent file (happens first time a download
|
| + # is attempted on an object), but warn user for other errors.
|
| + if isinstance(e, ValueError) or e.errno != errno.ENOENT:
|
| + logger.warn('Couldn\'t read sliced download tracker file (%s): %s. '
|
| + 'Restarting download from scratch.' %
|
| + (tracker_file_name, str(e)))
|
| + finally:
|
| + if fp:
|
| + fp.close()
|
| + if tracker_file:
|
| + tracker_file.close()
|
| +
|
| + # Delete component tracker files to guarantee download starts from scratch.
|
| + DeleteDownloadTrackerFiles(dst_url, api_selector)
|
| +
|
| + # Create a new sliced download tracker file to represent this download.
|
| + try:
|
| + with open(tracker_file_name, 'w') as tracker_file:
|
| + tracker_file_data = {'etag': src_obj_metadata.etag,
|
| + 'generation': src_obj_metadata.generation,
|
| + 'num_components': num_components}
|
| + tracker_file.write(json.dumps(tracker_file_data))
|
| + except IOError as e:
|
| + RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)
|
| +
|
| +
|
| +class SlicedDownloadFileWrapper(object):
|
| + """Wraps a file object to be used in GetObjectMedia for sliced downloads.
|
|
|
| + In order to allow resumability, the file object used by each thread in a
|
| + sliced object download should be wrapped using SlicedDownloadFileWrapper.
|
| + Passing a SlicedDownloadFileWrapper object to GetObjectMedia will allow the
|
| + download component tracker file for this component to be updated periodically,
|
| + while the downloaded bytes are normally written to file.
|
| + """
|
| +
|
| + def __init__(self, fp, tracker_file_name, src_obj_metadata, start_byte,
|
| + end_byte):
|
| + """Initializes the SlicedDownloadFileWrapper.
|
| +
|
| + Args:
|
| + fp: The already-open file object to be used for writing in
|
| + GetObjectMedia. Data will be written to file starting at the current
|
| + seek position.
|
| + tracker_file_name: The name of the tracker file for this component.
|
| + src_obj_metadata: Metadata from the source object. Must include etag and
|
| + generation.
|
| + start_byte: The first byte to be downloaded for this parallel component.
|
| + end_byte: The last byte to be downloaded for this parallel component.
|
| + """
|
| + self._orig_fp = fp
|
| + self._tracker_file_name = tracker_file_name
|
| + self._src_obj_metadata = src_obj_metadata
|
| + self._last_tracker_file_byte = None
|
| + self._start_byte = start_byte
|
| + self._end_byte = end_byte
|
| +
|
| + def write(self, data): # pylint: disable=invalid-name
|
| + current_file_pos = self._orig_fp.tell()
|
| + assert (self._start_byte <= current_file_pos and
|
| + current_file_pos + len(data) <= self._end_byte + 1)
|
| +
|
| + self._orig_fp.write(data)
|
| + current_file_pos = self._orig_fp.tell()
|
| +
|
| + threshold = TRACKERFILE_UPDATE_THRESHOLD
|
| + if (self._last_tracker_file_byte is None or
|
| + current_file_pos - self._last_tracker_file_byte > threshold or
|
| + current_file_pos == self._end_byte + 1):
|
| + WriteDownloadComponentTrackerFile(
|
| + self._tracker_file_name, self._src_obj_metadata, current_file_pos)
|
| + self._last_tracker_file_byte = current_file_pos
|
| +
|
| + def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
|
| + if whence == os.SEEK_END:
|
| + self._orig_fp.seek(offset + self._end_byte + 1)
|
| else:
|
| - raise CommandException('Invalid download strategy %s chosen for'
|
| - 'file %s' % (download_strategy, fp.name))
|
| + self._orig_fp.seek(offset, whence)
|
| + assert self._start_byte <= self._orig_fp.tell() <= self._end_byte + 1
|
| +
|
| + def tell(self): # pylint: disable=invalid-name
|
| + return self._orig_fp.tell()
|
| +
|
| + def flush(self): # pylint: disable=invalid-name
|
| + self._orig_fp.flush()
|
| +
|
| + def close(self): # pylint: disable=invalid-name
|
| + if self._orig_fp:
|
| + self._orig_fp.close()
|
| +
|
| +
|
| +def _PartitionObject(src_url, src_obj_metadata, dst_url,
|
| + download_file_name):
|
| + """Partitions an object into components to be downloaded.
|
| +
|
| + Each component is a byte range of the object. The byte ranges
|
| + of the returned components are mutually exclusive and collectively
|
| + exhaustive. The byte ranges are inclusive at both end points.
|
| +
|
| + Args:
|
| + src_url: Source CloudUrl.
|
| + src_obj_metadata: Metadata from the source object.
|
| + dst_url: Destination FileUrl.
|
| + download_file_name: Temporary file name to be used for the download.
|
| +
|
| + Returns:
|
| + components_to_download: A list of PerformSlicedDownloadObjectToFileArgs
|
| + to be used in Apply for the sliced download.
|
| + """
|
| + sliced_download_component_size = HumanReadableToBytes(
|
| + config.get('GSUtil', 'sliced_object_download_component_size',
|
| + DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE))
|
| +
|
| + max_components = config.getint(
|
| + 'GSUtil', 'sliced_object_download_max_components',
|
| + DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS)
|
| +
|
| + num_components, component_size = _GetPartitionInfo(
|
| + src_obj_metadata.size, max_components, sliced_download_component_size)
|
| +
|
| + components_to_download = []
|
| + component_lengths = []
|
| + for i in range(num_components):
|
| + start_byte = i * component_size
|
| + end_byte = min((i + 1) * (component_size) - 1, src_obj_metadata.size - 1)
|
| + component_lengths.append(end_byte - start_byte + 1)
|
| + components_to_download.append(
|
| + PerformSlicedDownloadObjectToFileArgs(
|
| + i, src_url, src_obj_metadata, dst_url, download_file_name,
|
| + start_byte, end_byte))
|
| + return components_to_download, component_lengths
|
| +
|
| +
|
| +def _DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name,
|
| + command_obj, logger, copy_exception_handler,
|
| + api_selector):
|
| + """Downloads a cloud object to a local file using sliced download.
|
| +
|
| + Byte ranges are decided for each thread/process, and then the parts are
|
| + downloaded in parallel.
|
| +
|
| + Args:
|
| + src_url: Source CloudUrl.
|
| + src_obj_metadata: Metadata from the source object.
|
| + dst_url: Destination FileUrl.
|
| + download_file_name: Temporary file name to be used for download.
|
| + command_obj: command object for use in Apply in parallel composite uploads.
|
| + logger: for outputting log messages.
|
| + copy_exception_handler: For handling copy exceptions during Apply.
|
| + api_selector: The Cloud API implementation used.
|
| +
|
| + Returns:
|
| + (bytes_transferred, crc32c)
|
| + bytes_transferred: Number of bytes transferred from server this call.
|
| + crc32c: a crc32c hash value (integer) for the downloaded bytes, or None if
|
| + crc32c hashing wasn't performed.
|
| + """
|
| + components_to_download, component_lengths = _PartitionObject(
|
| + src_url, src_obj_metadata, dst_url, download_file_name)
|
| +
|
| + num_components = len(components_to_download)
|
| + _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url,
|
| + download_file_name, logger,
|
| + api_selector, num_components)
|
| +
|
| + # Resize the download file so each child process can seek to its start byte.
|
| + with open(download_file_name, 'ab') as fp:
|
| + fp.truncate(src_obj_metadata.size)
|
| +
|
| + cp_results = command_obj.Apply(
|
| + _PerformSlicedDownloadObjectToFile, components_to_download,
|
| + copy_exception_handler, arg_checker=gslib.command.DummyArgChecker,
|
| + parallel_operations_override=True, should_return_results=True)
|
| +
|
| + if len(cp_results) < num_components:
|
| + raise CommandException(
|
| + 'Some components of %s were not downloaded successfully. '
|
| + 'Please retry this download.' % dst_url.object_name)
|
| +
|
| + # Crc32c hashes have to be concatenated in the correct order.
|
| + cp_results = sorted(cp_results, key=attrgetter('component_num'))
|
| + crc32c = cp_results[0].crc32c
|
| + if crc32c is not None:
|
| + for i in range(1, num_components):
|
| + crc32c = ConcatCrc32c(crc32c, cp_results[i].crc32c,
|
| + component_lengths[i])
|
| +
|
| + bytes_transferred = 0
|
| + expect_gzip = (src_obj_metadata.contentEncoding and
|
| + src_obj_metadata.contentEncoding.lower().endswith('gzip'))
|
| + for cp_result in cp_results:
|
| + bytes_transferred += cp_result.bytes_transferred
|
| + server_gzip = (cp_result.server_encoding and
|
| + cp_result.server_encoding.lower().endswith('gzip'))
|
| + # If the server gzipped any components on the fly, we will have no chance of
|
| + # properly reconstructing the file.
|
| + if server_gzip and not expect_gzip:
|
| + raise CommandException(
|
| + 'Download of %s failed because the server sent back data with an '
|
| + 'unexpected encoding.' % dst_url.object_name)
|
| +
|
| + return bytes_transferred, crc32c
|
| +
|
|
|
| - if not dst_url.IsStream():
|
| - serialization_data = json.dumps(serialization_dict)
|
| +def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
|
| + download_file_name, gsutil_api, logger,
|
| + digesters, component_num=None, start_byte=0,
|
| + end_byte=None):
|
| + """Downloads an object to a local file using the resumable strategy.
|
| +
|
| + Args:
|
| + src_url: Source CloudUrl.
|
| + src_obj_metadata: Metadata from the source object.
|
| + dst_url: Destination FileUrl.
|
| + download_file_name: Temporary file name to be used for download.
|
| + gsutil_api: gsutil Cloud API instance to use for the download.
|
| + logger: for outputting log messages.
|
| + digesters: Digesters corresponding to the hash algorithms that will be used
|
| + for validation.
|
| + component_num: Which component of a sliced download this call is for, or
|
| + None if this is not a sliced download.
|
| + start_byte: The first byte of a byte range for a sliced download.
|
| + end_byte: The last byte of a byte range for a sliced download.
|
| +
|
| + Returns:
|
| + (bytes_transferred, server_encoding)
|
| + bytes_transferred: Number of bytes transferred from server this call.
|
| + server_encoding: Content-encoding string if it was detected that the server
|
| + sent encoded bytes during transfer, None otherwise.
|
| + """
|
| + if end_byte is None:
|
| + end_byte = src_obj_metadata.size - 1
|
| + download_size = end_byte - start_byte + 1
|
| +
|
| + is_sliced = component_num is not None
|
| + api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
|
| + server_encoding = None
|
| +
|
| + # Used for logging
|
| + download_name = dst_url.object_name
|
| + if is_sliced:
|
| + download_name += ' component %d' % component_num
|
| +
|
| + try:
|
| + fp = open(download_file_name, 'r+b')
|
| + fp.seek(start_byte)
|
| + api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
|
| + existing_file_size = GetFileSize(fp)
|
| +
|
| + tracker_file_name, download_start_byte = (
|
| + ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger,
|
| + api_selector, start_byte,
|
| + existing_file_size, component_num))
|
| +
|
| + if download_start_byte < start_byte or download_start_byte > end_byte + 1:
|
| + DeleteTrackerFile(tracker_file_name)
|
| + raise CommandException(
|
| + 'Resumable download start point for %s is not in the correct byte '
|
| + 'range. Deleting tracker file, so if you re-try this download it '
|
| + 'will start from scratch' % download_name)
|
| +
|
| + download_complete = (download_start_byte == start_byte + download_size)
|
| + resuming = (download_start_byte != start_byte) and not download_complete
|
| + if resuming:
|
| + logger.info('Resuming download for %s', download_name)
|
| + elif download_complete:
|
| + logger.info(
|
| + 'Download already complete for %s, skipping download but '
|
| + 'will run integrity checks.', download_name)
|
| +
|
| + # This is used for resuming downloads, but also for passing the mediaLink
|
| + # and size into the download for new downloads so that we can avoid
|
| + # making an extra HTTP call.
|
| + serialization_data = GetDownloadSerializationData(
|
| + src_obj_metadata, progress=download_start_byte)
|
| +
|
| + if resuming or download_complete:
|
| + # Catch up our digester with the hash data.
|
| + bytes_digested = 0
|
| + total_bytes_to_digest = download_start_byte - start_byte
|
| + hash_callback = ProgressCallbackWithBackoff(
|
| + total_bytes_to_digest,
|
| + FileProgressCallbackHandler(
|
| + ConstructAnnounceText('Hashing',
|
| + dst_url.url_string), logger).call)
|
| +
|
| + while bytes_digested < total_bytes_to_digest:
|
| + bytes_to_read = min(DEFAULT_FILE_BUFFER_SIZE,
|
| + total_bytes_to_digest - bytes_digested)
|
| + data = fp.read(bytes_to_read)
|
| + bytes_digested += bytes_to_read
|
| + for alg_name in digesters:
|
| + digesters[alg_name].update(data)
|
| + hash_callback.Progress(len(data))
|
| +
|
| + elif not is_sliced:
|
| + # Delete file contents and start entire object download from scratch.
|
| + fp.truncate(0)
|
| + existing_file_size = 0
|
|
|
| progress_callback = FileProgressCallbackHandler(
|
| - ConstructAnnounceText('Downloading', dst_url.url_string),
|
| - logger).call
|
| + ConstructAnnounceText('Downloading', dst_url.url_string), logger,
|
| + start_byte, download_size).call
|
| +
|
| if global_copy_helper_opts.test_callback_file:
|
| with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
|
| progress_callback = pickle.loads(test_fp.read()).call
|
|
|
| - start_time = time.time()
|
| + if is_sliced and src_obj_metadata.size >= ResumableThreshold():
|
| + fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata,
|
| + start_byte, end_byte)
|
| +
|
| # TODO: With gzip encoding (which may occur on-the-fly and not be part of
|
| # the object's metadata), when we request a range to resume, it's possible
|
| # that the server will just resend the entire object, which means our
|
| @@ -1811,58 +2175,186 @@ def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
|
| # the local file in the case of a failed gzip hash anyway, but it would
|
| # be better if we actively detected this case.
|
| if not download_complete:
|
| + fp.seek(download_start_byte)
|
| server_encoding = gsutil_api.GetObjectMedia(
|
| src_url.bucket_name, src_url.object_name, fp,
|
| - start_byte=download_start_point, generation=src_url.generation,
|
| - object_size=src_obj_metadata.size,
|
| - download_strategy=download_strategy, provider=src_url.scheme,
|
| - serialization_data=serialization_data, digesters=digesters,
|
| - progress_callback=progress_callback)
|
| -
|
| - end_time = time.time()
|
| -
|
| - # If a custom test method is defined, call it here. For the copy command,
|
| - # test methods are expected to take one argument: an open file pointer,
|
| - # and are used to perturb the open file during download to exercise
|
| - # download error detection.
|
| - if test_method:
|
| - test_method(fp)
|
| + start_byte=download_start_byte, end_byte=end_byte,
|
| + generation=src_url.generation, object_size=src_obj_metadata.size,
|
| + download_strategy=CloudApi.DownloadStrategy.RESUMABLE,
|
| + provider=src_url.scheme, serialization_data=serialization_data,
|
| + digesters=digesters, progress_callback=progress_callback)
|
|
|
| except ResumableDownloadException as e:
|
| - logger.warning('Caught ResumableDownloadException (%s) for file %s.',
|
| - e.reason, file_name)
|
| + logger.warning('Caught ResumableDownloadException (%s) for download of %s.',
|
| + e.reason, download_name)
|
| raise
|
| + finally:
|
| + if fp:
|
| + fp.close()
|
| +
|
| + bytes_transferred = end_byte - download_start_byte + 1
|
| + return bytes_transferred, server_encoding
|
| +
|
|
|
| +def _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url,
|
| + download_file_name, gsutil_api, logger,
|
| + digesters):
|
| + """Downloads an object to a local file using the non-resumable strategy.
|
| +
|
| + Args:
|
| + src_url: Source CloudUrl.
|
| + src_obj_metadata: Metadata from the source object.
|
| + dst_url: Destination FileUrl.
|
| + download_file_name: Temporary file name to be used for download.
|
| + gsutil_api: gsutil Cloud API instance to use for the download.
|
| + logger: for outputting log messages.
|
| + digesters: Digesters corresponding to the hash algorithms that will be used
|
| + for validation.
|
| + Returns:
|
| + (bytes_transferred, server_encoding)
|
| + bytes_transferred: Number of bytes transferred from server this call.
|
| + server_encoding: Content-encoding string if it was detected that the server
|
| + sent encoded bytes during transfer, None otherwise.
|
| + """
|
| + try:
|
| + fp = open(download_file_name, 'w')
|
| +
|
| + # This is used to pass the mediaLink and the size into the download so that
|
| + # we can avoid making an extra HTTP call.
|
| + serialization_data = GetDownloadSerializationData(src_obj_metadata)
|
| +
|
| + progress_callback = FileProgressCallbackHandler(
|
| + ConstructAnnounceText('Downloading', dst_url.url_string), logger).call
|
| +
|
| + if global_copy_helper_opts.test_callback_file:
|
| + with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
|
| + progress_callback = pickle.loads(test_fp.read()).call
|
| +
|
| + server_encoding = gsutil_api.GetObjectMedia(
|
| + src_url.bucket_name, src_url.object_name, fp,
|
| + generation=src_url.generation, object_size=src_obj_metadata.size,
|
| + download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
|
| + provider=src_url.scheme, serialization_data=serialization_data,
|
| + digesters=digesters, progress_callback=progress_callback)
|
| finally:
|
| if fp:
|
| fp.close()
|
| - for file_name in open_files:
|
| - open_files_map.delete(file_name)
|
| -
|
| - # If we decompressed a content-encoding gzip file on the fly, this may not
|
| - # be accurate, but it is the best we can do without going deep into the
|
| - # underlying HTTP libraries. Note that this value is only used for
|
| - # reporting in log messages; inaccuracy doesn't impact the integrity of the
|
| - # download.
|
| - bytes_transferred = src_obj_metadata.size - download_start_point
|
| +
|
| + return src_obj_metadata.size, server_encoding
|
| +
|
| +
|
| +def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
|
| + gsutil_api, logger, command_obj,
|
| + copy_exception_handler, allow_splitting=True):
|
| + """Downloads an object to a local file.
|
| +
|
| + Args:
|
| + src_url: Source CloudUrl.
|
| + src_obj_metadata: Metadata from the source object.
|
| + dst_url: Destination FileUrl.
|
| + gsutil_api: gsutil Cloud API instance to use for the download.
|
| + logger: for outputting log messages.
|
| + command_obj: command object for use in Apply in sliced downloads.
|
| + copy_exception_handler: For handling copy exceptions during Apply.
|
| + allow_splitting: Whether or not to allow sliced download.
|
| + Returns:
|
| + (elapsed_time, bytes_transferred, dst_url, md5), where time elapsed
|
| + excludes initial GET.
|
| +
|
| + Raises:
|
| + FileConcurrencySkipError: if this download is already in progress.
|
| + CommandException: if other errors encountered.
|
| + """
|
| + global open_files_map, open_files_lock
|
| + if dst_url.object_name.endswith(dst_url.delim):
|
| + logger.warn('\n'.join(textwrap.wrap(
|
| + 'Skipping attempt to download to filename ending with slash (%s). This '
|
| + 'typically happens when using gsutil to download from a subdirectory '
|
| + 'created by the Cloud Console (https://cloud.google.com/console)'
|
| + % dst_url.object_name)))
|
| + return (0, 0, dst_url, '')
|
| +
|
| + api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
|
| + download_strategy = _SelectDownloadStrategy(dst_url)
|
| + sliced_download = _ShouldDoSlicedDownload(
|
| + download_strategy, src_obj_metadata, allow_splitting, logger)
|
| +
|
| + download_file_name, need_to_unzip = _GetDownloadFile(
|
| + dst_url, src_obj_metadata, logger)
|
| +
|
| + # Ensure another process/thread is not already writing to this file.
|
| + with open_files_lock:
|
| + if open_files_map.get(download_file_name, False):
|
| + raise FileConcurrencySkipError
|
| + open_files_map[download_file_name] = True
|
| +
|
| + # Set up hash digesters.
|
| + consider_md5 = src_obj_metadata.md5Hash and not sliced_download
|
| + hash_algs = GetDownloadHashAlgs(logger, consider_md5=consider_md5,
|
| + consider_crc32c=src_obj_metadata.crc32c)
|
| + digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
|
| +
|
| + # Tracks whether the server used a gzip encoding.
|
| + server_encoding = None
|
| + download_complete = (src_obj_metadata.size == 0)
|
| + bytes_transferred = 0
|
| +
|
| + start_time = time.time()
|
| + if not download_complete:
|
| + if sliced_download:
|
| + (bytes_transferred, crc32c) = (
|
| + _DoSlicedDownload(src_url, src_obj_metadata, dst_url,
|
| + download_file_name, command_obj, logger,
|
| + copy_exception_handler, api_selector))
|
| + if 'crc32c' in digesters:
|
| + digesters['crc32c'].crcValue = crc32c
|
| + elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
|
| + (bytes_transferred, server_encoding) = (
|
| + _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url,
|
| + download_file_name, gsutil_api,
|
| + logger, digesters))
|
| + elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
|
| + (bytes_transferred, server_encoding) = (
|
| + _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
|
| + download_file_name, gsutil_api, logger,
|
| + digesters))
|
| + else:
|
| + raise CommandException('Invalid download strategy %s chosen for'
|
| + 'file %s' % (download_strategy,
|
| + download_file_name))
|
| + end_time = time.time()
|
| +
|
| server_gzip = server_encoding and server_encoding.lower().endswith('gzip')
|
| - local_md5 = _ValidateDownloadHashes(logger, src_url, src_obj_metadata,
|
| - dst_url, need_to_unzip, server_gzip,
|
| - digesters, hash_algs, api_selector,
|
| - bytes_transferred)
|
| + local_md5 = _ValidateAndCompleteDownload(
|
| + logger, src_url, src_obj_metadata, dst_url, need_to_unzip, server_gzip,
|
| + digesters, hash_algs, download_file_name, api_selector, bytes_transferred)
|
| +
|
| + with open_files_lock:
|
| + open_files_map.delete(download_file_name)
|
|
|
| return (end_time - start_time, bytes_transferred, dst_url, local_md5)
|
|
|
|
|
| -def _GetDownloadZipFileName(file_name):
|
| - """Returns the file name for a temporarily compressed downloaded file."""
|
| - return '%s_.gztmp' % file_name
|
| +def _GetDownloadTempZipFileName(dst_url):
|
| + """Returns temporary file name for a temporarily compressed download."""
|
| + return '%s_.gztmp' % dst_url.object_name
|
| +
|
| +
|
| +def _GetDownloadTempFileName(dst_url):
|
| + """Returns temporary download file name for uncompressed downloads."""
|
| + return '%s_.gstmp' % dst_url.object_name
|
|
|
|
|
| -def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
|
| - need_to_unzip, server_gzip, digesters, hash_algs,
|
| - api_selector, bytes_transferred):
|
| - """Validates a downloaded file's integrity.
|
| +def _ValidateAndCompleteDownload(logger, src_url, src_obj_metadata, dst_url,
|
| + need_to_unzip, server_gzip, digesters,
|
| + hash_algs, download_file_name,
|
| + api_selector, bytes_transferred):
|
| + """Validates and performs necessary operations on a downloaded file.
|
| +
|
| + Validates the integrity of the downloaded file using hash_algs. If the file
|
| + was compressed (temporarily), the file will be decompressed. Then, if the
|
| + integrity of the file was successfully validated, the file will be moved
|
| + from its temporary download location to its permanent location on disk.
|
|
|
| Args:
|
| logger: For outputting log messages.
|
| @@ -1880,6 +2372,7 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
|
| hash must be recomputed from the local file.
|
| hash_algs: dict of {string, hash algorithm} that can be used if digesters
|
| don't have up-to-date digests.
|
| + download_file_name: Temporary file name that was used for download.
|
| api_selector: The Cloud API implementation used (used tracker file naming).
|
| bytes_transferred: Number of bytes downloaded (used for logging).
|
|
|
| @@ -1887,10 +2380,10 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
|
| An MD5 of the local file, if one was calculated as part of the integrity
|
| check.
|
| """
|
| - file_name = dst_url.object_name
|
| - download_file_name = (_GetDownloadZipFileName(file_name) if need_to_unzip else
|
| - file_name)
|
| + final_file_name = dst_url.object_name
|
| + file_name = download_file_name
|
| digesters_succeeded = True
|
| +
|
| for alg in digesters:
|
| # If we get a digester with a None algorithm, the underlying
|
| # implementation failed to calculate a digest, so we will need to
|
| @@ -1903,15 +2396,14 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
|
| local_hashes = _CreateDigestsFromDigesters(digesters)
|
| else:
|
| local_hashes = _CreateDigestsFromLocalFile(
|
| - logger, hash_algs, download_file_name, src_obj_metadata)
|
| + logger, hash_algs, file_name, final_file_name, src_obj_metadata)
|
|
|
| digest_verified = True
|
| hash_invalid_exception = None
|
| try:
|
| - _CheckHashes(logger, src_url, src_obj_metadata, download_file_name,
|
| + _CheckHashes(logger, src_url, src_obj_metadata, final_file_name,
|
| local_hashes)
|
| - DeleteTrackerFile(GetTrackerFilePath(
|
| - dst_url, TrackerFileType.DOWNLOAD, api_selector))
|
| + DeleteDownloadTrackerFiles(dst_url, api_selector)
|
| except HashMismatchException, e:
|
| # If an non-gzipped object gets sent with gzip content encoding, the hash
|
| # we calculate will match the gzipped bytes, not the original object. Thus,
|
| @@ -1929,34 +2421,27 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
|
| hash_invalid_exception = e
|
| digest_verified = False
|
| else:
|
| - DeleteTrackerFile(GetTrackerFilePath(
|
| - dst_url, TrackerFileType.DOWNLOAD, api_selector))
|
| + DeleteDownloadTrackerFiles(dst_url, api_selector)
|
| if _RENAME_ON_HASH_MISMATCH:
|
| - os.rename(download_file_name,
|
| - download_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
|
| + os.rename(file_name,
|
| + final_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
|
| else:
|
| - os.unlink(download_file_name)
|
| + os.unlink(file_name)
|
| raise
|
|
|
| - if server_gzip and not need_to_unzip:
|
| - # Server compressed bytes on-the-fly, thus we need to rename and decompress.
|
| - # We can't decompress on-the-fly because prior to Python 3.2 the gzip
|
| - # module makes a bunch of seek calls on the stream.
|
| - download_file_name = _GetDownloadZipFileName(file_name)
|
| - os.rename(file_name, download_file_name)
|
| -
|
| if need_to_unzip or server_gzip:
|
| # Log that we're uncompressing if the file is big enough that
|
| # decompressing would make it look like the transfer "stalled" at the end.
|
| if bytes_transferred > TEN_MIB:
|
| logger.info(
|
| - 'Uncompressing downloaded tmp file to %s...', file_name)
|
| + 'Uncompressing temporarily gzipped file to %s...', final_file_name)
|
|
|
| - # Downloaded gzipped file to a filename w/o .gz extension, so unzip.
|
| gzip_fp = None
|
| try:
|
| - gzip_fp = gzip.open(download_file_name, 'rb')
|
| - with open(file_name, 'wb') as f_out:
|
| + # Downloaded temporarily gzipped file, unzip to file without '_.gztmp'
|
| + # suffix.
|
| + gzip_fp = gzip.open(file_name, 'rb')
|
| + with open(final_file_name, 'wb') as f_out:
|
| data = gzip_fp.read(GZIP_CHUNK_SIZE)
|
| while data:
|
| f_out.write(data)
|
| @@ -1972,19 +2457,19 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
|
| if gzip_fp:
|
| gzip_fp.close()
|
|
|
| - os.unlink(download_file_name)
|
| + os.unlink(file_name)
|
| + file_name = final_file_name
|
|
|
| if not digest_verified:
|
| try:
|
| # Recalculate hashes on the unzipped local file.
|
| - local_hashes = _CreateDigestsFromLocalFile(logger, hash_algs, file_name,
|
| - src_obj_metadata)
|
| - _CheckHashes(logger, src_url, src_obj_metadata, file_name, local_hashes)
|
| - DeleteTrackerFile(GetTrackerFilePath(
|
| - dst_url, TrackerFileType.DOWNLOAD, api_selector))
|
| + local_hashes = _CreateDigestsFromLocalFile(
|
| + logger, hash_algs, file_name, final_file_name, src_obj_metadata)
|
| + _CheckHashes(logger, src_url, src_obj_metadata, final_file_name,
|
| + local_hashes)
|
| + DeleteDownloadTrackerFiles(dst_url, api_selector)
|
| except HashMismatchException:
|
| - DeleteTrackerFile(GetTrackerFilePath(
|
| - dst_url, TrackerFileType.DOWNLOAD, api_selector))
|
| + DeleteDownloadTrackerFiles(dst_url, api_selector)
|
| if _RENAME_ON_HASH_MISMATCH:
|
| os.rename(file_name,
|
| file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
|
| @@ -1992,6 +2477,13 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
|
| os.unlink(file_name)
|
| raise
|
|
|
| + if file_name != final_file_name:
|
| + # Data is still in a temporary file, so move it to a permanent location.
|
| + if os.path.exists(final_file_name):
|
| + os.unlink(final_file_name)
|
| + os.rename(file_name,
|
| + final_file_name)
|
| +
|
| if 'md5' in local_hashes:
|
| return local_hashes['md5']
|
|
|
| @@ -2123,7 +2615,7 @@ def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url,
|
| # pylint: disable=too-many-statements
|
| def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
|
| copy_exception_handler, allow_splitting=True,
|
| - headers=None, manifest=None, gzip_exts=None, test_method=None):
|
| + headers=None, manifest=None, gzip_exts=None):
|
| """Performs copy from src_url to dst_url, handling various special cases.
|
|
|
| Args:
|
| @@ -2131,14 +2623,14 @@ def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
|
| src_url: Source StorageUrl.
|
| dst_url: Destination StorageUrl.
|
| gsutil_api: gsutil Cloud API instance to use for the copy.
|
| - command_obj: command object for use in Apply in parallel composite uploads.
|
| + command_obj: command object for use in Apply in parallel composite uploads
|
| + and sliced object downloads.
|
| copy_exception_handler: for handling copy exceptions during Apply.
|
| allow_splitting: Whether to allow the file to be split into component
|
| - pieces for an parallel composite upload.
|
| + pieces for an parallel composite upload or download.
|
| headers: optional headers to use for the copy operation.
|
| manifest: optional manifest for tracking copy operations.
|
| gzip_exts: List of file extensions to gzip for uploads, if any.
|
| - test_method: optional test method for modifying files during unit tests.
|
|
|
| Returns:
|
| (elapsed_time, bytes_transferred, version-specific dst_url) excluding
|
| @@ -2194,7 +2686,7 @@ def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
|
| else:
|
| # Just get the fields needed to validate the download.
|
| src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag',
|
| - 'mediaLink', 'md5Hash', 'size']
|
| + 'mediaLink', 'md5Hash', 'size', 'generation']
|
|
|
| if (src_url.scheme == 's3' and
|
| global_copy_helper_opts.skip_unsupported_objects):
|
| @@ -2231,8 +2723,14 @@ def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
|
| try:
|
| src_obj_filestream = GetStreamFromFileUrl(src_url)
|
| except Exception, e: # pylint: disable=broad-except
|
| - raise CommandException('Error opening file "%s": %s.' % (src_url,
|
| - e.message))
|
| + if command_obj.continue_on_error:
|
| + message = 'Error copying %s: %s' % (src_url, str(e))
|
| + command_obj.op_failure_count += 1
|
| + logger.error(message)
|
| + return
|
| + else:
|
| + raise CommandException('Error opening file "%s": %s.' % (src_url,
|
| + e.message))
|
| if src_url.IsStream():
|
| src_obj_size = None
|
| else:
|
| @@ -2308,7 +2806,9 @@ def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
|
| if src_url.IsCloudUrl():
|
| if dst_url.IsFileUrl():
|
| return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
|
| - gsutil_api, logger, test_method=test_method)
|
| + gsutil_api, logger, command_obj,
|
| + copy_exception_handler,
|
| + allow_splitting=allow_splitting)
|
| elif copy_in_the_cloud:
|
| return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
|
| dst_obj_metadata, preconditions,
|
| @@ -2499,27 +2999,8 @@ def GetPathBeforeFinalDir(url):
|
| return url.url_string.rstrip(sep).rpartition(sep)[0]
|
|
|
|
|
| -def _DivideAndCeil(dividend, divisor):
|
| - """Returns ceil(dividend / divisor).
|
| -
|
| - Takes care to avoid the pitfalls of floating point arithmetic that could
|
| - otherwise yield the wrong result for large numbers.
|
| -
|
| - Args:
|
| - dividend: Dividend for the operation.
|
| - divisor: Divisor for the operation.
|
| -
|
| - Returns:
|
| - Quotient.
|
| - """
|
| - quotient = dividend // divisor
|
| - if (dividend % divisor) != 0:
|
| - quotient += 1
|
| - return quotient
|
| -
|
| -
|
| def _GetPartitionInfo(file_size, max_components, default_component_size):
|
| - """Gets info about a file partition for parallel composite uploads.
|
| + """Gets info about a file partition for parallel file/object transfers.
|
|
|
| Args:
|
| file_size: The number of bytes in the file to be partitioned.
|
| @@ -2532,22 +3013,29 @@ def _GetPartitionInfo(file_size, max_components, default_component_size):
|
| file_size != 0 (mod num_components)).
|
| """
|
| # num_components = ceil(file_size / default_component_size)
|
| - num_components = _DivideAndCeil(file_size, default_component_size)
|
| + num_components = DivideAndCeil(file_size, default_component_size)
|
|
|
| # num_components must be in the range [2, max_components]
|
| num_components = max(min(num_components, max_components), 2)
|
|
|
| # component_size = ceil(file_size / num_components)
|
| - component_size = _DivideAndCeil(file_size, num_components)
|
| + component_size = DivideAndCeil(file_size, num_components)
|
| return (num_components, component_size)
|
|
|
|
|
| -def _DeleteObjectFn(cls, url_to_delete, thread_state=None):
|
| - """Wrapper function to be used with command.Apply()."""
|
| +def _DeleteTempComponentObjectFn(cls, url_to_delete, thread_state=None):
|
| + """Wrapper func to be used with command.Apply to delete temporary objects."""
|
| gsutil_api = GetCloudApiInstance(cls, thread_state)
|
| - gsutil_api.DeleteObject(
|
| - url_to_delete.bucket_name, url_to_delete.object_name,
|
| - generation=url_to_delete.generation, provider=url_to_delete.scheme)
|
| + try:
|
| + gsutil_api.DeleteObject(
|
| + url_to_delete.bucket_name, url_to_delete.object_name,
|
| + generation=url_to_delete.generation, provider=url_to_delete.scheme)
|
| + except NotFoundException:
|
| + # The temporary object could already be gone if a retry was
|
| + # issued at a lower layer but the original request succeeded.
|
| + # Barring other errors, the top-level command should still report success,
|
| + # so don't raise here.
|
| + pass
|
|
|
|
|
| def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock):
|
|
|