Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(351)

Unified Diff: tools/telemetry/third_party/gsutilz/gslib/copy_helper.py

Issue 1376593003: Roll gsutil version to 4.15. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutilz/gslib/copy_helper.py
diff --git a/tools/telemetry/third_party/gsutilz/gslib/copy_helper.py b/tools/telemetry/third_party/gsutilz/gslib/copy_helper.py
index 456b15cf47a011e3c5b71a426c46dffeb520a833..aea4195bd028749f71ca361e5fd8a7dda37a9070 100644
--- a/tools/telemetry/third_party/gsutilz/gslib/copy_helper.py
+++ b/tools/telemetry/third_party/gsutilz/gslib/copy_helper.py
@@ -27,7 +27,7 @@ from hashlib import md5
import json
import logging
import mimetypes
-import multiprocessing
+from operator import attrgetter
import os
import pickle
import random
@@ -53,10 +53,13 @@ from gslib.cloud_api import ResumableDownloadException
from gslib.cloud_api import ResumableUploadAbortException
from gslib.cloud_api import ResumableUploadException
from gslib.cloud_api import ResumableUploadStartOverException
-from gslib.cloud_api_helper import GetDownloadSerializationDict
+from gslib.cloud_api_helper import GetDownloadSerializationData
from gslib.commands.compose import MAX_COMPOSE_ARITY
from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE
from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD
+from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE
+from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS
+from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD
from gslib.cs_api_map import ApiSelector
from gslib.daisy_chain_wrapper import DaisyChainWrapper
from gslib.exception import CommandException
@@ -65,11 +68,13 @@ from gslib.file_part import FilePart
from gslib.hashing_helper import Base64EncodeHash
from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
from gslib.hashing_helper import CalculateHashesFromContents
+from gslib.hashing_helper import CHECK_HASH_IF_FAST_ELSE_FAIL
+from gslib.hashing_helper import CHECK_HASH_NEVER
+from gslib.hashing_helper import ConcatCrc32c
from gslib.hashing_helper import GetDownloadHashAlgs
from gslib.hashing_helper import GetUploadHashAlgs
from gslib.hashing_helper import HashingFileUploadWrapper
-from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
-from gslib.parallelism_framework_util import ThreadSafeDict
+from gslib.parallelism_framework_util import AtomicDict
from gslib.progress_callback import ConstructAnnounceText
from gslib.progress_callback import FileProgressCallbackHandler
from gslib.progress_callback import ProgressCallbackWithBackoff
@@ -77,11 +82,13 @@ from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper
from gslib.storage_url import ContainsWildcard
from gslib.storage_url import StorageUrlFromString
from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
+from gslib.tracker_file import DeleteDownloadTrackerFiles
from gslib.tracker_file import DeleteTrackerFile
from gslib.tracker_file import GetTrackerFilePath
from gslib.tracker_file import RaiseUnwritableTrackerFileException
from gslib.tracker_file import ReadOrCreateDownloadTrackerFile
from gslib.tracker_file import TrackerFileType
+from gslib.tracker_file import WriteDownloadComponentTrackerFile
from gslib.translation_helper import AddS3MarkerAclToObjectMetadata
from gslib.translation_helper import CopyObjectMetadata
from gslib.translation_helper import DEFAULT_CONTENT_TYPE
@@ -89,8 +96,11 @@ from gslib.translation_helper import GenerationFromUrlAndString
from gslib.translation_helper import ObjectMetadataFromHeaders
from gslib.translation_helper import PreconditionsFromHeaders
from gslib.translation_helper import S3MarkerAclFromObjectMetadata
+from gslib.util import CheckFreeSpace
+from gslib.util import CheckMultiprocessingAvailableAndInit
from gslib.util import CreateLock
from gslib.util import DEFAULT_FILE_BUFFER_SIZE
+from gslib.util import DivideAndCeil
from gslib.util import GetCloudApiInstance
from gslib.util import GetFileSize
from gslib.util import GetJsonResumableChunkSize
@@ -102,39 +112,34 @@ from gslib.util import IS_WINDOWS
from gslib.util import IsCloudSubdirPlaceholder
from gslib.util import MakeHumanReadable
from gslib.util import MIN_SIZE_COMPUTE_LOGGING
-from gslib.util import MultiprocessingIsAvailable
from gslib.util import ResumableThreshold
from gslib.util import TEN_MIB
+from gslib.util import UsingCrcmodExtension
from gslib.util import UTF8
from gslib.wildcard_iterator import CreateWildcardIterator
# pylint: disable=g-import-not-at-top
if IS_WINDOWS:
import msvcrt
- from ctypes import c_int
- from ctypes import c_uint64
- from ctypes import c_char_p
- from ctypes import c_wchar_p
- from ctypes import windll
- from ctypes import POINTER
- from ctypes import WINFUNCTYPE
- from ctypes import WinError
# Declare copy_helper_opts as a global because namedtuple isn't aware of
# assigning to a class member (which breaks pickling done by multiprocessing).
# For details see
# http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instance-correctly
-# Similarly can't pickle logger.
# pylint: disable=global-at-module-level
-global global_copy_helper_opts, global_logger
+global global_copy_helper_opts
# In-memory map of local files that are currently opened for write. Used to
# ensure that if we write to the same file twice (say, for example, because the
# user specified two identical source URLs), the writes occur serially.
-global open_files_map
+global open_files_map, open_files_lock
open_files_map = (
- ThreadSafeDict() if (IS_WINDOWS or not MultiprocessingIsAvailable()[0])
- else ThreadAndProcessSafeDict(multiprocessing.Manager()))
+ AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available
+ else AtomicDict(manager=gslib.util.manager))
+
+# We don't allow multiple processes on Windows, so using a process-safe lock
+# would be unnecessary.
+open_files_lock = CreateLock()
# For debugging purposes; if True, files and objects that fail hash validation
# will be saved with the below suffix appended.
@@ -178,6 +183,15 @@ PerformParallelUploadFileToObjectArgs = namedtuple(
'filename file_start file_length src_url dst_url canned_acl '
'content_type tracker_file tracker_file_lock')
+PerformSlicedDownloadObjectToFileArgs = namedtuple(
+ 'PerformSlicedDownloadObjectToFileArgs',
+ 'component_num src_url src_obj_metadata dst_url download_file_name '
+ 'start_byte end_byte')
+
+PerformSlicedDownloadReturnValues = namedtuple(
+ 'PerformSlicedDownloadReturnValues',
+ 'component_num crc32c bytes_transferred server_encoding')
+
ObjectFromTracker = namedtuple('ObjectFromTracker',
'object_name generation')
@@ -188,13 +202,23 @@ ObjectFromTracker = namedtuple('ObjectFromTracker',
# Chunk size to use while zipping/unzipping gzip files.
GZIP_CHUNK_SIZE = 8192
+# Number of bytes to wait before updating a sliced download component tracker
+# file.
+TRACKERFILE_UPDATE_THRESHOLD = TEN_MIB
+
PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024
# S3 requires special Multipart upload logic (that we currently don't implement)
# for files > 5GiB in size.
S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024
-suggested_parallel_composites = False
+# TODO: Create a multiprocessing framework value allocator, then use it instead
+# of a dict.
+global suggested_sliced_transfers, suggested_sliced_transfers_lock
+suggested_sliced_transfers = (
+ AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available
+ else AtomicDict(manager=gslib.util.manager))
+suggested_sliced_transfers_lock = CreateLock()
class FileConcurrencySkipError(Exception):
@@ -206,7 +230,7 @@ def _RmExceptionHandler(cls, e):
cls.logger.error(str(e))
-def _ParallelUploadCopyExceptionHandler(cls, e):
+def _ParallelCopyExceptionHandler(cls, e):
"""Simple exception handler to allow post-completion status."""
cls.logger.error(str(e))
cls.op_failure_count += 1
@@ -248,7 +272,7 @@ def _PerformParallelUploadFileToObject(cls, args, thread_state=None):
ret = _UploadFileToObject(args.src_url, fp, args.file_length,
args.dst_url, dst_object_metadata,
preconditions, gsutil_api, cls.logger, cls,
- _ParallelUploadCopyExceptionHandler,
+ _ParallelCopyExceptionHandler,
gzip_exts=None, allow_splitting=False)
finally:
if global_copy_helper_opts.canned_acl:
@@ -641,23 +665,23 @@ def _CreateDigestsFromDigesters(digesters):
return digests
-def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata):
+def _CreateDigestsFromLocalFile(logger, algs, file_name, final_file_name,
+ src_obj_metadata):
"""Creates a base64 CRC32C and/or MD5 digest from file_name.
Args:
- logger: for outputting log messages.
- algs: list of algorithms to compute.
- file_name: file to digest.
- src_obj_metadata: metadta of source object.
+ logger: For outputting log messages.
+ algs: List of algorithms to compute.
+ file_name: File to digest.
+ final_file_name: Permanent location to be used for the downloaded file
+ after validation (used for logging).
+ src_obj_metadata: Metadata of source object.
Returns:
Dict of algorithm name : base 64 encoded digest
"""
hash_dict = {}
if 'md5' in algs:
- if src_obj_metadata.size and src_obj_metadata.size > TEN_MIB:
- logger.info(
- 'Computing MD5 for %s...', file_name)
hash_dict['md5'] = md5()
if 'crc32c' in algs:
hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c')
@@ -666,7 +690,8 @@ def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata):
fp, hash_dict, ProgressCallbackWithBackoff(
src_obj_metadata.size,
FileProgressCallbackHandler(
- ConstructAnnounceText('Hashing', file_name), logger).call))
+ ConstructAnnounceText('Hashing', final_file_name),
+ logger).call))
digests = {}
for alg_name, digest in hash_dict.iteritems():
digests[alg_name] = Base64EncodeHash(digest.hexdigest())
@@ -730,7 +755,8 @@ def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests,
logger: for outputting log messages.
obj_url: CloudUrl for cloud object.
obj_metadata: Cloud Object being downloaded from or uploaded to.
- file_name: Local file name on disk being downloaded to or uploaded from.
+ file_name: Local file name on disk being downloaded to or uploaded from
+ (used only for logging).
digests: Computed Digests for the object.
is_upload: If true, comparing for an uploaded object (controls logging).
@@ -985,9 +1011,10 @@ def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata,
# those that were uploaded by a previous, failed run and have since
# changed (but still have an old generation lying around).
objects_to_delete = components + existing_objects_to_delete
- command_obj.Apply(_DeleteObjectFn, objects_to_delete, _RmExceptionHandler,
- arg_checker=gslib.command.DummyArgChecker,
- parallel_operations_override=True)
+ command_obj.Apply(
+ _DeleteTempComponentObjectFn, objects_to_delete, _RmExceptionHandler,
+ arg_checker=gslib.command.DummyArgChecker,
+ parallel_operations_override=True)
except Exception: # pylint: disable=broad-except
# If some of the delete calls fail, don't cause the whole command to
# fail. The copy was successful iff the compose call succeeded, so
@@ -1025,7 +1052,7 @@ def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url,
Returns:
True iff a parallel upload should be performed on the source file.
"""
- global suggested_parallel_composites
+ global suggested_slice_transfers, suggested_sliced_transfers_lock
parallel_composite_upload_threshold = HumanReadableToBytes(config.get(
'GSUtil', 'parallel_composite_upload_threshold',
DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD))
@@ -1041,17 +1068,18 @@ def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url,
# TODO: Once compiled crcmod is being distributed by major Linux distributions
# remove this check.
if (all_factors_but_size and parallel_composite_upload_threshold == 0
- and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD
- and not suggested_parallel_composites):
- logger.info('\n'.join(textwrap.wrap(
- '==> NOTE: You are uploading one or more large file(s), which would '
- 'run significantly faster if you enable parallel composite uploads. '
- 'This feature can be enabled by editing the '
- '"parallel_composite_upload_threshold" value in your .boto '
- 'configuration file. However, note that if you do this you and any '
- 'users that download such composite files will need to have a compiled '
- 'crcmod installed (see "gsutil help crcmod").')) + '\n')
- suggested_parallel_composites = True
+ and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD):
+ with suggested_sliced_transfers_lock:
+ if not suggested_sliced_transfers.get('suggested'):
+ logger.info('\n'.join(textwrap.wrap(
+ '==> NOTE: You are uploading one or more large file(s), which '
+ 'would run significantly faster if you enable parallel composite '
+ 'uploads. This feature can be enabled by editing the '
+ '"parallel_composite_upload_threshold" value in your .boto '
+ 'configuration file. However, note that if you do this you and any '
+ 'users that download such composite files will need to have a '
+ 'compiled crcmod installed (see "gsutil help crcmod").')) + '\n')
+ suggested_sliced_transfers['suggested'] = True
return (all_factors_but_size
and parallel_composite_upload_threshold > 0
@@ -1104,34 +1132,45 @@ def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id,
if storage_url.IsBucket():
return (storage_url, True)
- # For object/prefix URLs check 3 cases: (a) if the name ends with '/' treat
- # as a subdir; otherwise, use the wildcard iterator with url to
- # find if (b) there's a Prefix matching url, or (c) name is of form
- # dir_$folder$ (and in both these cases also treat dir as a subdir).
- # Cloud subdirs are always considered to be an existing container.
- if IsCloudSubdirPlaceholder(storage_url):
- return (storage_url, True)
+ # For object/prefix URLs, there are four cases that indicate the destination
+ # is a cloud subdirectory; these are always considered to be an existing
+ # container. Checking each case allows gsutil to provide Unix-like
+ # destination folder semantics, but requires up to three HTTP calls, noted
+ # below.
- # Check for the special case where we have a folder marker object.
- folder_expansion = CreateWildcardIterator(
- storage_url.versionless_url_string + '_$folder$', gsutil_api,
- debug=debug, project_id=project_id).IterAll(
- bucket_listing_fields=['name'])
- for blr in folder_expansion:
+ # Case 1: If a placeholder object ending with '/' exists.
+ if IsCloudSubdirPlaceholder(storage_url):
return (storage_url, True)
- blr_expansion = CreateWildcardIterator(url_str, gsutil_api,
- debug=debug,
- project_id=project_id).IterAll(
- bucket_listing_fields=['name'])
+ # HTTP call to make an eventually consistent check for a matching prefix,
+ # _$folder$, or empty listing.
expansion_empty = True
- for blr in blr_expansion:
+ list_iterator = gsutil_api.ListObjects(
+ storage_url.bucket_name, prefix=storage_url.object_name, delimiter='/',
+ provider=storage_url.scheme, fields=['prefixes', 'items/name'])
+ for obj_or_prefix in list_iterator:
+ # To conserve HTTP calls for the common case, we make a single listing
+ # that covers prefixes and object names. Listing object names covers the
+ # _$folder$ case and the nonexistent-object-as-subdir case. However, if
+ # there are many existing objects for which the target URL is an exact
+ # prefix, this listing could be paginated and span multiple HTTP calls.
+ # If this case becomes common, we could heurestically abort the
+ # listing operation after the first page of results and just query for the
+ # _$folder$ object directly using GetObjectMetadata.
expansion_empty = False
- if blr.IsPrefix():
+
+ if obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.PREFIX:
+ # Case 2: If there is a matching prefix when listing the destination URL.
+ return (storage_url, True)
+ elif (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT and
+ obj_or_prefix.data.name == storage_url.object_name + '_$folder$'):
+ # Case 3: If a placeholder object matching destination + _$folder$
+ # exists.
return (storage_url, True)
- return (storage_url,
- expansion_empty and treat_nonexistent_object_as_subdir)
+ # Case 4: If no objects/prefixes matched, and nonexistent objects should be
+ # treated as subdirectories.
+ return (storage_url, expansion_empty and treat_nonexistent_object_as_subdir)
def FixWindowsNaming(src_url, dst_url):
@@ -1242,46 +1281,6 @@ def _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
dst_obj.md5Hash)
-def _CheckFreeSpace(path):
- """Return path/drive free space (in bytes)."""
- if IS_WINDOWS:
- # pylint: disable=g-import-not-at-top
- try:
- # pylint: disable=invalid-name
- get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p,
- POINTER(c_uint64),
- POINTER(c_uint64),
- POINTER(c_uint64))
- get_disk_free_space_ex = get_disk_free_space_ex(
- ('GetDiskFreeSpaceExW', windll.kernel32), (
- (1, 'lpszPathName'),
- (2, 'lpFreeUserSpace'),
- (2, 'lpTotalSpace'),
- (2, 'lpFreeSpace'),))
- except AttributeError:
- get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p,
- POINTER(c_uint64),
- POINTER(c_uint64),
- POINTER(c_uint64))
- get_disk_free_space_ex = get_disk_free_space_ex(
- ('GetDiskFreeSpaceExA', windll.kernel32), (
- (1, 'lpszPathName'),
- (2, 'lpFreeUserSpace'),
- (2, 'lpTotalSpace'),
- (2, 'lpFreeSpace'),))
-
- def GetDiskFreeSpaceExErrCheck(result, unused_func, args):
- if not result:
- raise WinError()
- return args[1].value
- get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck
-
- return get_disk_free_space_ex(os.getenv('SystemDrive'))
- else:
- (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path)
- return f_frsize * f_bavail
-
-
def _SetContentTypeFromFile(src_url, dst_obj_metadata):
"""Detects and sets Content-Type if src_url names a local file.
@@ -1509,7 +1508,7 @@ def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger):
# Check for temp space. Assume the compressed object is at most 2x
# the size of the object (normally should compress to smaller than
# the object)
- if _CheckFreeSpace(gzip_path) < 2*int(src_obj_size):
+ if CheckFreeSpace(gzip_path) < 2*int(src_obj_size):
raise CommandException('Inadequate temp space available to compress '
'%s. See the CHANGING TEMP DIRECTORIES section '
'of "gsutil help cp" for more info.' % src_url)
@@ -1657,30 +1656,26 @@ def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size,
uploaded_object.md5Hash)
-# TODO: Refactor this long function into smaller pieces.
-# pylint: disable=too-many-statements
-def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
- gsutil_api, logger, test_method=None):
- """Downloads an object to a local file.
+def _GetDownloadFile(dst_url, src_obj_metadata, logger):
+ """Creates a new download file, and deletes the file that will be replaced.
+
+ Names and creates a temporary file for this download. Also, if there is an
+ existing file at the path where this file will be placed after the download
+ is completed, that file will be deleted.
Args:
- src_url: Source CloudUrl.
- src_obj_metadata: Metadata from the source object.
dst_url: Destination FileUrl.
- gsutil_api: gsutil Cloud API instance to use for the download.
+ src_obj_metadata: Metadata from the source object.
logger: for outputting log messages.
- test_method: Optional test method for modifying the file before validation
- during unit tests.
- Returns:
- (elapsed_time, bytes_transferred, dst_url, md5), excluding overhead like
- initial GET.
- Raises:
- CommandException: if errors encountered.
+ Returns:
+ (download_file_name, need_to_unzip)
+ download_file_name: The name of the temporary file to which the object will
+ be downloaded.
+ need_to_unzip: If true, a temporary zip file was used and must be
+ uncompressed as part of validation.
"""
- global open_files_map
- file_name = dst_url.object_name
- dir_name = os.path.dirname(file_name)
+ dir_name = os.path.dirname(dst_url.object_name)
if dir_name and not os.path.exists(dir_name):
# Do dir creation in try block so can ignore case where dir already
# exists. This is needed to avoid a race condition when running gsutil
@@ -1690,120 +1685,489 @@ def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
except OSError, e:
if e.errno != errno.EEXIST:
raise
- api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
+
+ need_to_unzip = False
# For gzipped objects download to a temp file and unzip. For the XML API,
- # the represents the result of a HEAD request. For the JSON API, this is
+ # this represents the result of a HEAD request. For the JSON API, this is
# the stored encoding which the service may not respect. However, if the
# server sends decompressed bytes for a file that is stored compressed
# (double compressed case), there is no way we can validate the hash and
# we will fail our hash check for the object.
if (src_obj_metadata.contentEncoding and
src_obj_metadata.contentEncoding.lower().endswith('gzip')):
- # We can't use tempfile.mkstemp() here because we need a predictable
- # filename for resumable downloads.
- download_file_name = _GetDownloadZipFileName(file_name)
+ need_to_unzip = True
+ download_file_name = _GetDownloadTempZipFileName(dst_url)
logger.info(
'Downloading to temp gzip filename %s', download_file_name)
- need_to_unzip = True
else:
- download_file_name = file_name
- need_to_unzip = False
+ download_file_name = _GetDownloadTempFileName(dst_url)
- if download_file_name.endswith(dst_url.delim):
- logger.warn('\n'.join(textwrap.wrap(
- 'Skipping attempt to download to filename ending with slash (%s). This '
- 'typically happens when using gsutil to download from a subdirectory '
- 'created by the Cloud Console (https://cloud.google.com/console)'
- % download_file_name)))
- return (0, 0, dst_url, '')
+ # If a file exists at the permanent destination (where the file will be moved
+ # after the download is completed), delete it here to reduce disk space
+ # requirements.
+ if os.path.exists(dst_url.object_name):
+ os.unlink(dst_url.object_name)
- # Set up hash digesters.
+ # Downloads open the temporary download file in r+b mode, which requires it
+ # to already exist, so we create it here if it doesn't exist already.
+ fp = open(download_file_name, 'ab')
+ fp.close()
+ return download_file_name, need_to_unzip
+
+
+def _ShouldDoSlicedDownload(download_strategy, src_obj_metadata,
+ allow_splitting, logger):
+ """Determines whether the sliced download strategy should be used.
+
+ Args:
+ download_strategy: CloudApi download strategy.
+ src_obj_metadata: Metadata from the source object.
+ allow_splitting: If false, then this function returns false.
+ logger: logging.Logger for log message output.
+
+ Returns:
+ True iff a sliced download should be performed on the source file.
+ """
+ sliced_object_download_threshold = HumanReadableToBytes(config.get(
+ 'GSUtil', 'sliced_object_download_threshold',
+ DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD))
+
+ max_components = config.getint(
+ 'GSUtil', 'sliced_object_download_max_components',
+ DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS)
+
+ # Don't use sliced download if it will prevent us from performing an
+ # integrity check.
+ check_hashes_config = config.get(
+ 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
+ parallel_hashing = src_obj_metadata.crc32c and UsingCrcmodExtension(crcmod)
+ hashing_okay = parallel_hashing or check_hashes_config == CHECK_HASH_NEVER
+
+ use_slice = (
+ allow_splitting
+ and download_strategy is not CloudApi.DownloadStrategy.ONE_SHOT
+ and max_components > 1
+ and hashing_okay
+ and sliced_object_download_threshold > 0
+ and src_obj_metadata.size >= sliced_object_download_threshold)
+
+ if (not use_slice
+ and src_obj_metadata.size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD
+ and not UsingCrcmodExtension(crcmod)
+ and check_hashes_config != CHECK_HASH_NEVER):
+ with suggested_sliced_transfers_lock:
+ if not suggested_sliced_transfers.get('suggested'):
+ logger.info('\n'.join(textwrap.wrap(
+ '==> NOTE: You are downloading one or more large file(s), which '
+ 'would run significantly faster if you enabled sliced object '
+ 'uploads. This feature is enabled by default but requires that '
+ 'compiled crcmod be installed (see "gsutil help crcmod").')) + '\n')
+ suggested_sliced_transfers['suggested'] = True
+
+ return use_slice
+
+
+def _PerformSlicedDownloadObjectToFile(cls, args, thread_state=None):
+ """Function argument to Apply for performing sliced downloads.
+
+ Args:
+ cls: Calling Command class.
+ args: PerformSlicedDownloadObjectToFileArgs tuple describing the target.
+ thread_state: gsutil Cloud API instance to use for the operation.
+
+ Returns:
+ PerformSlicedDownloadReturnValues named-tuple filled with:
+ component_num: The component number for this download.
+ crc32c: CRC32C hash value (integer) of the downloaded bytes.
+ bytes_transferred: The number of bytes transferred, potentially less
+ than the component size if the download was resumed.
+ """
+ gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
hash_algs = GetDownloadHashAlgs(
- logger, src_has_md5=src_obj_metadata.md5Hash,
- src_has_crc32c=src_obj_metadata.crc32c)
+ cls.logger, consider_crc32c=args.src_obj_metadata.crc32c)
digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
- fp = None
- # Tracks whether the server used a gzip encoding.
- server_encoding = None
- download_complete = False
- download_strategy = _SelectDownloadStrategy(dst_url)
- download_start_point = 0
- # This is used for resuming downloads, but also for passing the mediaLink
- # and size into the download for new downloads so that we can avoid
- # making an extra HTTP call.
- serialization_data = None
- serialization_dict = GetDownloadSerializationDict(src_obj_metadata)
- open_files = []
+ (bytes_transferred, server_encoding) = (
+ _DownloadObjectToFileResumable(args.src_url, args.src_obj_metadata,
+ args.dst_url, args.download_file_name,
+ gsutil_api, cls.logger, digesters,
+ component_num=args.component_num,
+ start_byte=args.start_byte,
+ end_byte=args.end_byte))
+
+ crc32c_val = None
+ if 'crc32c' in digesters:
+ crc32c_val = digesters['crc32c'].crcValue
+ return PerformSlicedDownloadReturnValues(
+ args.component_num, crc32c_val, bytes_transferred, server_encoding)
+
+
+def _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url,
+ download_file_name, logger,
+ api_selector, num_components):
+ """Maintains sliced download tracker files in order to permit resumability.
+
+ Reads or creates a sliced download tracker file representing this object
+ download. Upon an attempt at cross-process resumption, the contents of the
+ sliced download tracker file are verified to make sure a resumption is
+ possible and appropriate. In the case that a resumption should not be
+ attempted, existing component tracker files are deleted (to prevent child
+ processes from attempting resumption), and a new sliced download tracker
+ file is created.
+
+ Args:
+ src_obj_metadata: Metadata from the source object. Must include etag and
+ generation.
+ dst_url: Destination FileUrl.
+ download_file_name: Temporary file name to be used for the download.
+ logger: for outputting log messages.
+ api_selector: The Cloud API implementation used.
+ num_components: The number of components to perform this download with.
+ """
+ assert src_obj_metadata.etag
+ tracker_file = None
+
+ # Only can happen if the resumable threshold is set higher than the
+ # parallel transfer threshold.
+ if src_obj_metadata.size < ResumableThreshold():
+ return
+
+ tracker_file_name = GetTrackerFilePath(dst_url,
+ TrackerFileType.SLICED_DOWNLOAD,
+ api_selector)
+
+ # Check to see if we should attempt resuming the download.
try:
- if download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
- fp = open(download_file_name, 'wb')
- elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
- # If this is a resumable download, we need to open the file for append and
- # manage a tracker file.
- if open_files_map.get(download_file_name, False):
- # Ensure another process/thread is not already writing to this file.
- raise FileConcurrencySkipError
- open_files.append(download_file_name)
- open_files_map[download_file_name] = True
- fp = open(download_file_name, 'ab')
-
- resuming = ReadOrCreateDownloadTrackerFile(
- src_obj_metadata, dst_url, api_selector)
- if resuming:
- # Find out how far along we are so we can request the appropriate
- # remaining range of the object.
- existing_file_size = GetFileSize(fp, position_to_eof=True)
- if existing_file_size > src_obj_metadata.size:
- DeleteTrackerFile(GetTrackerFilePath(
- dst_url, TrackerFileType.DOWNLOAD, api_selector))
- raise CommandException(
- '%s is larger (%d) than %s (%d).\nDeleting tracker file, so '
- 'if you re-try this download it will start from scratch' %
- (download_file_name, existing_file_size, src_url.object_name,
- src_obj_metadata.size))
- else:
- if existing_file_size == src_obj_metadata.size:
- logger.info(
- 'Download already complete for file %s, skipping download but '
- 'will run integrity checks.', download_file_name)
- download_complete = True
- else:
- download_start_point = existing_file_size
- serialization_dict['progress'] = download_start_point
- logger.info('Resuming download for %s', src_url.url_string)
- # Catch up our digester with the hash data.
- if existing_file_size > TEN_MIB:
- for alg_name in digesters:
- logger.info(
- 'Catching up %s for %s', alg_name, download_file_name)
- with open(download_file_name, 'rb') as hash_fp:
- while True:
- data = hash_fp.read(DEFAULT_FILE_BUFFER_SIZE)
- if not data:
- break
- for alg_name in digesters:
- digesters[alg_name].update(data)
+ fp = open(download_file_name, 'rb')
+ existing_file_size = GetFileSize(fp)
+ # A parallel resumption should be attempted only if the destination file
+ # size is exactly the same as the source size and the tracker file matches.
+ if existing_file_size == src_obj_metadata.size:
+ tracker_file = open(tracker_file_name, 'r')
+ tracker_file_data = json.load(tracker_file)
+ if (tracker_file_data['etag'] == src_obj_metadata.etag and
+ tracker_file_data['generation'] == src_obj_metadata.generation and
+ tracker_file_data['num_components'] == num_components):
+ return
else:
- # Starting a new download, blow away whatever is already there.
- fp.truncate(0)
- fp.seek(0)
+ tracker_file.close()
+ logger.warn('Sliced download tracker file doesn\'t match for '
+ 'download of %s. Restarting download from scratch.' %
+ dst_url.object_name)
+
+ except (IOError, ValueError) as e:
+ # Ignore non-existent file (happens first time a download
+ # is attempted on an object), but warn user for other errors.
+ if isinstance(e, ValueError) or e.errno != errno.ENOENT:
+ logger.warn('Couldn\'t read sliced download tracker file (%s): %s. '
+ 'Restarting download from scratch.' %
+ (tracker_file_name, str(e)))
+ finally:
+ if fp:
+ fp.close()
+ if tracker_file:
+ tracker_file.close()
+
+ # Delete component tracker files to guarantee download starts from scratch.
+ DeleteDownloadTrackerFiles(dst_url, api_selector)
+
+ # Create a new sliced download tracker file to represent this download.
+ try:
+ with open(tracker_file_name, 'w') as tracker_file:
+ tracker_file_data = {'etag': src_obj_metadata.etag,
+ 'generation': src_obj_metadata.generation,
+ 'num_components': num_components}
+ tracker_file.write(json.dumps(tracker_file_data))
+ except IOError as e:
+ RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)
+
+
+class SlicedDownloadFileWrapper(object):
+ """Wraps a file object to be used in GetObjectMedia for sliced downloads.
+ In order to allow resumability, the file object used by each thread in a
+ sliced object download should be wrapped using SlicedDownloadFileWrapper.
+ Passing a SlicedDownloadFileWrapper object to GetObjectMedia will allow the
+ download component tracker file for this component to be updated periodically,
+ while the downloaded bytes are normally written to file.
+ """
+
+ def __init__(self, fp, tracker_file_name, src_obj_metadata, start_byte,
+ end_byte):
+ """Initializes the SlicedDownloadFileWrapper.
+
+ Args:
+ fp: The already-open file object to be used for writing in
+ GetObjectMedia. Data will be written to file starting at the current
+ seek position.
+ tracker_file_name: The name of the tracker file for this component.
+ src_obj_metadata: Metadata from the source object. Must include etag and
+ generation.
+ start_byte: The first byte to be downloaded for this parallel component.
+ end_byte: The last byte to be downloaded for this parallel component.
+ """
+ self._orig_fp = fp
+ self._tracker_file_name = tracker_file_name
+ self._src_obj_metadata = src_obj_metadata
+ self._last_tracker_file_byte = None
+ self._start_byte = start_byte
+ self._end_byte = end_byte
+
+ def write(self, data): # pylint: disable=invalid-name
+ current_file_pos = self._orig_fp.tell()
+ assert (self._start_byte <= current_file_pos and
+ current_file_pos + len(data) <= self._end_byte + 1)
+
+ self._orig_fp.write(data)
+ current_file_pos = self._orig_fp.tell()
+
+ threshold = TRACKERFILE_UPDATE_THRESHOLD
+ if (self._last_tracker_file_byte is None or
+ current_file_pos - self._last_tracker_file_byte > threshold or
+ current_file_pos == self._end_byte + 1):
+ WriteDownloadComponentTrackerFile(
+ self._tracker_file_name, self._src_obj_metadata, current_file_pos)
+ self._last_tracker_file_byte = current_file_pos
+
+ def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
+ if whence == os.SEEK_END:
+ self._orig_fp.seek(offset + self._end_byte + 1)
else:
- raise CommandException('Invalid download strategy %s chosen for'
- 'file %s' % (download_strategy, fp.name))
+ self._orig_fp.seek(offset, whence)
+ assert self._start_byte <= self._orig_fp.tell() <= self._end_byte + 1
+
+ def tell(self): # pylint: disable=invalid-name
+ return self._orig_fp.tell()
+
+ def flush(self): # pylint: disable=invalid-name
+ self._orig_fp.flush()
+
+ def close(self): # pylint: disable=invalid-name
+ if self._orig_fp:
+ self._orig_fp.close()
+
+
+def _PartitionObject(src_url, src_obj_metadata, dst_url,
+ download_file_name):
+ """Partitions an object into components to be downloaded.
+
+ Each component is a byte range of the object. The byte ranges
+ of the returned components are mutually exclusive and collectively
+ exhaustive. The byte ranges are inclusive at both end points.
+
+ Args:
+ src_url: Source CloudUrl.
+ src_obj_metadata: Metadata from the source object.
+ dst_url: Destination FileUrl.
+ download_file_name: Temporary file name to be used for the download.
+
+ Returns:
+ components_to_download: A list of PerformSlicedDownloadObjectToFileArgs
+ to be used in Apply for the sliced download.
+ """
+ sliced_download_component_size = HumanReadableToBytes(
+ config.get('GSUtil', 'sliced_object_download_component_size',
+ DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE))
+
+ max_components = config.getint(
+ 'GSUtil', 'sliced_object_download_max_components',
+ DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS)
+
+ num_components, component_size = _GetPartitionInfo(
+ src_obj_metadata.size, max_components, sliced_download_component_size)
+
+ components_to_download = []
+ component_lengths = []
+ for i in range(num_components):
+ start_byte = i * component_size
+ end_byte = min((i + 1) * (component_size) - 1, src_obj_metadata.size - 1)
+ component_lengths.append(end_byte - start_byte + 1)
+ components_to_download.append(
+ PerformSlicedDownloadObjectToFileArgs(
+ i, src_url, src_obj_metadata, dst_url, download_file_name,
+ start_byte, end_byte))
+ return components_to_download, component_lengths
+
+
+def _DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name,
+ command_obj, logger, copy_exception_handler,
+ api_selector):
+ """Downloads a cloud object to a local file using sliced download.
+
+ Byte ranges are decided for each thread/process, and then the parts are
+ downloaded in parallel.
+
+ Args:
+ src_url: Source CloudUrl.
+ src_obj_metadata: Metadata from the source object.
+ dst_url: Destination FileUrl.
+ download_file_name: Temporary file name to be used for download.
+ command_obj: command object for use in Apply in parallel composite uploads.
+ logger: for outputting log messages.
+ copy_exception_handler: For handling copy exceptions during Apply.
+ api_selector: The Cloud API implementation used.
+
+ Returns:
+ (bytes_transferred, crc32c)
+ bytes_transferred: Number of bytes transferred from server this call.
+ crc32c: a crc32c hash value (integer) for the downloaded bytes, or None if
+ crc32c hashing wasn't performed.
+ """
+ components_to_download, component_lengths = _PartitionObject(
+ src_url, src_obj_metadata, dst_url, download_file_name)
+
+ num_components = len(components_to_download)
+ _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url,
+ download_file_name, logger,
+ api_selector, num_components)
+
+ # Resize the download file so each child process can seek to its start byte.
+ with open(download_file_name, 'ab') as fp:
+ fp.truncate(src_obj_metadata.size)
+
+ cp_results = command_obj.Apply(
+ _PerformSlicedDownloadObjectToFile, components_to_download,
+ copy_exception_handler, arg_checker=gslib.command.DummyArgChecker,
+ parallel_operations_override=True, should_return_results=True)
+
+ if len(cp_results) < num_components:
+ raise CommandException(
+ 'Some components of %s were not downloaded successfully. '
+ 'Please retry this download.' % dst_url.object_name)
+
+ # Crc32c hashes have to be concatenated in the correct order.
+ cp_results = sorted(cp_results, key=attrgetter('component_num'))
+ crc32c = cp_results[0].crc32c
+ if crc32c is not None:
+ for i in range(1, num_components):
+ crc32c = ConcatCrc32c(crc32c, cp_results[i].crc32c,
+ component_lengths[i])
+
+ bytes_transferred = 0
+ expect_gzip = (src_obj_metadata.contentEncoding and
+ src_obj_metadata.contentEncoding.lower().endswith('gzip'))
+ for cp_result in cp_results:
+ bytes_transferred += cp_result.bytes_transferred
+ server_gzip = (cp_result.server_encoding and
+ cp_result.server_encoding.lower().endswith('gzip'))
+ # If the server gzipped any components on the fly, we will have no chance of
+ # properly reconstructing the file.
+ if server_gzip and not expect_gzip:
+ raise CommandException(
+ 'Download of %s failed because the server sent back data with an '
+ 'unexpected encoding.' % dst_url.object_name)
+
+ return bytes_transferred, crc32c
+
- if not dst_url.IsStream():
- serialization_data = json.dumps(serialization_dict)
+def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
+ download_file_name, gsutil_api, logger,
+ digesters, component_num=None, start_byte=0,
+ end_byte=None):
+ """Downloads an object to a local file using the resumable strategy.
+
+ Args:
+ src_url: Source CloudUrl.
+ src_obj_metadata: Metadata from the source object.
+ dst_url: Destination FileUrl.
+ download_file_name: Temporary file name to be used for download.
+ gsutil_api: gsutil Cloud API instance to use for the download.
+ logger: for outputting log messages.
+ digesters: Digesters corresponding to the hash algorithms that will be used
+ for validation.
+ component_num: Which component of a sliced download this call is for, or
+ None if this is not a sliced download.
+ start_byte: The first byte of a byte range for a sliced download.
+ end_byte: The last byte of a byte range for a sliced download.
+
+ Returns:
+ (bytes_transferred, server_encoding)
+ bytes_transferred: Number of bytes transferred from server this call.
+ server_encoding: Content-encoding string if it was detected that the server
+ sent encoded bytes during transfer, None otherwise.
+ """
+ if end_byte is None:
+ end_byte = src_obj_metadata.size - 1
+ download_size = end_byte - start_byte + 1
+
+ is_sliced = component_num is not None
+ api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
+ server_encoding = None
+
+ # Used for logging
+ download_name = dst_url.object_name
+ if is_sliced:
+ download_name += ' component %d' % component_num
+
+ try:
+ fp = open(download_file_name, 'r+b')
+ fp.seek(start_byte)
+ api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
+ existing_file_size = GetFileSize(fp)
+
+ tracker_file_name, download_start_byte = (
+ ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger,
+ api_selector, start_byte,
+ existing_file_size, component_num))
+
+ if download_start_byte < start_byte or download_start_byte > end_byte + 1:
+ DeleteTrackerFile(tracker_file_name)
+ raise CommandException(
+ 'Resumable download start point for %s is not in the correct byte '
+ 'range. Deleting tracker file, so if you re-try this download it '
+ 'will start from scratch' % download_name)
+
+ download_complete = (download_start_byte == start_byte + download_size)
+ resuming = (download_start_byte != start_byte) and not download_complete
+ if resuming:
+ logger.info('Resuming download for %s', download_name)
+ elif download_complete:
+ logger.info(
+ 'Download already complete for %s, skipping download but '
+ 'will run integrity checks.', download_name)
+
+ # This is used for resuming downloads, but also for passing the mediaLink
+ # and size into the download for new downloads so that we can avoid
+ # making an extra HTTP call.
+ serialization_data = GetDownloadSerializationData(
+ src_obj_metadata, progress=download_start_byte)
+
+ if resuming or download_complete:
+ # Catch up our digester with the hash data.
+ bytes_digested = 0
+ total_bytes_to_digest = download_start_byte - start_byte
+ hash_callback = ProgressCallbackWithBackoff(
+ total_bytes_to_digest,
+ FileProgressCallbackHandler(
+ ConstructAnnounceText('Hashing',
+ dst_url.url_string), logger).call)
+
+ while bytes_digested < total_bytes_to_digest:
+ bytes_to_read = min(DEFAULT_FILE_BUFFER_SIZE,
+ total_bytes_to_digest - bytes_digested)
+ data = fp.read(bytes_to_read)
+ bytes_digested += bytes_to_read
+ for alg_name in digesters:
+ digesters[alg_name].update(data)
+ hash_callback.Progress(len(data))
+
+ elif not is_sliced:
+ # Delete file contents and start entire object download from scratch.
+ fp.truncate(0)
+ existing_file_size = 0
progress_callback = FileProgressCallbackHandler(
- ConstructAnnounceText('Downloading', dst_url.url_string),
- logger).call
+ ConstructAnnounceText('Downloading', dst_url.url_string), logger,
+ start_byte, download_size).call
+
if global_copy_helper_opts.test_callback_file:
with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
progress_callback = pickle.loads(test_fp.read()).call
- start_time = time.time()
+ if is_sliced and src_obj_metadata.size >= ResumableThreshold():
+ fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata,
+ start_byte, end_byte)
+
# TODO: With gzip encoding (which may occur on-the-fly and not be part of
# the object's metadata), when we request a range to resume, it's possible
# that the server will just resend the entire object, which means our
@@ -1811,58 +2175,186 @@ def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
# the local file in the case of a failed gzip hash anyway, but it would
# be better if we actively detected this case.
if not download_complete:
+ fp.seek(download_start_byte)
server_encoding = gsutil_api.GetObjectMedia(
src_url.bucket_name, src_url.object_name, fp,
- start_byte=download_start_point, generation=src_url.generation,
- object_size=src_obj_metadata.size,
- download_strategy=download_strategy, provider=src_url.scheme,
- serialization_data=serialization_data, digesters=digesters,
- progress_callback=progress_callback)
-
- end_time = time.time()
-
- # If a custom test method is defined, call it here. For the copy command,
- # test methods are expected to take one argument: an open file pointer,
- # and are used to perturb the open file during download to exercise
- # download error detection.
- if test_method:
- test_method(fp)
+ start_byte=download_start_byte, end_byte=end_byte,
+ generation=src_url.generation, object_size=src_obj_metadata.size,
+ download_strategy=CloudApi.DownloadStrategy.RESUMABLE,
+ provider=src_url.scheme, serialization_data=serialization_data,
+ digesters=digesters, progress_callback=progress_callback)
except ResumableDownloadException as e:
- logger.warning('Caught ResumableDownloadException (%s) for file %s.',
- e.reason, file_name)
+ logger.warning('Caught ResumableDownloadException (%s) for download of %s.',
+ e.reason, download_name)
raise
+ finally:
+ if fp:
+ fp.close()
+
+ bytes_transferred = end_byte - download_start_byte + 1
+ return bytes_transferred, server_encoding
+
+def _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url,
+ download_file_name, gsutil_api, logger,
+ digesters):
+ """Downloads an object to a local file using the non-resumable strategy.
+
+ Args:
+ src_url: Source CloudUrl.
+ src_obj_metadata: Metadata from the source object.
+ dst_url: Destination FileUrl.
+ download_file_name: Temporary file name to be used for download.
+ gsutil_api: gsutil Cloud API instance to use for the download.
+ logger: for outputting log messages.
+ digesters: Digesters corresponding to the hash algorithms that will be used
+ for validation.
+ Returns:
+ (bytes_transferred, server_encoding)
+ bytes_transferred: Number of bytes transferred from server this call.
+ server_encoding: Content-encoding string if it was detected that the server
+ sent encoded bytes during transfer, None otherwise.
+ """
+ try:
+ fp = open(download_file_name, 'w')
+
+ # This is used to pass the mediaLink and the size into the download so that
+ # we can avoid making an extra HTTP call.
+ serialization_data = GetDownloadSerializationData(src_obj_metadata)
+
+ progress_callback = FileProgressCallbackHandler(
+ ConstructAnnounceText('Downloading', dst_url.url_string), logger).call
+
+ if global_copy_helper_opts.test_callback_file:
+ with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
+ progress_callback = pickle.loads(test_fp.read()).call
+
+ server_encoding = gsutil_api.GetObjectMedia(
+ src_url.bucket_name, src_url.object_name, fp,
+ generation=src_url.generation, object_size=src_obj_metadata.size,
+ download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
+ provider=src_url.scheme, serialization_data=serialization_data,
+ digesters=digesters, progress_callback=progress_callback)
finally:
if fp:
fp.close()
- for file_name in open_files:
- open_files_map.delete(file_name)
-
- # If we decompressed a content-encoding gzip file on the fly, this may not
- # be accurate, but it is the best we can do without going deep into the
- # underlying HTTP libraries. Note that this value is only used for
- # reporting in log messages; inaccuracy doesn't impact the integrity of the
- # download.
- bytes_transferred = src_obj_metadata.size - download_start_point
+
+ return src_obj_metadata.size, server_encoding
+
+
+def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
+ gsutil_api, logger, command_obj,
+ copy_exception_handler, allow_splitting=True):
+ """Downloads an object to a local file.
+
+ Args:
+ src_url: Source CloudUrl.
+ src_obj_metadata: Metadata from the source object.
+ dst_url: Destination FileUrl.
+ gsutil_api: gsutil Cloud API instance to use for the download.
+ logger: for outputting log messages.
+ command_obj: command object for use in Apply in sliced downloads.
+ copy_exception_handler: For handling copy exceptions during Apply.
+ allow_splitting: Whether or not to allow sliced download.
+ Returns:
+ (elapsed_time, bytes_transferred, dst_url, md5), where time elapsed
+ excludes initial GET.
+
+ Raises:
+ FileConcurrencySkipError: if this download is already in progress.
+ CommandException: if other errors encountered.
+ """
+ global open_files_map, open_files_lock
+ if dst_url.object_name.endswith(dst_url.delim):
+ logger.warn('\n'.join(textwrap.wrap(
+ 'Skipping attempt to download to filename ending with slash (%s). This '
+ 'typically happens when using gsutil to download from a subdirectory '
+ 'created by the Cloud Console (https://cloud.google.com/console)'
+ % dst_url.object_name)))
+ return (0, 0, dst_url, '')
+
+ api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
+ download_strategy = _SelectDownloadStrategy(dst_url)
+ sliced_download = _ShouldDoSlicedDownload(
+ download_strategy, src_obj_metadata, allow_splitting, logger)
+
+ download_file_name, need_to_unzip = _GetDownloadFile(
+ dst_url, src_obj_metadata, logger)
+
+ # Ensure another process/thread is not already writing to this file.
+ with open_files_lock:
+ if open_files_map.get(download_file_name, False):
+ raise FileConcurrencySkipError
+ open_files_map[download_file_name] = True
+
+ # Set up hash digesters.
+ consider_md5 = src_obj_metadata.md5Hash and not sliced_download
+ hash_algs = GetDownloadHashAlgs(logger, consider_md5=consider_md5,
+ consider_crc32c=src_obj_metadata.crc32c)
+ digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
+
+ # Tracks whether the server used a gzip encoding.
+ server_encoding = None
+ download_complete = (src_obj_metadata.size == 0)
+ bytes_transferred = 0
+
+ start_time = time.time()
+ if not download_complete:
+ if sliced_download:
+ (bytes_transferred, crc32c) = (
+ _DoSlicedDownload(src_url, src_obj_metadata, dst_url,
+ download_file_name, command_obj, logger,
+ copy_exception_handler, api_selector))
+ if 'crc32c' in digesters:
+ digesters['crc32c'].crcValue = crc32c
+ elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
+ (bytes_transferred, server_encoding) = (
+ _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url,
+ download_file_name, gsutil_api,
+ logger, digesters))
+ elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
+ (bytes_transferred, server_encoding) = (
+ _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
+ download_file_name, gsutil_api, logger,
+ digesters))
+ else:
+ raise CommandException('Invalid download strategy %s chosen for'
+ 'file %s' % (download_strategy,
+ download_file_name))
+ end_time = time.time()
+
server_gzip = server_encoding and server_encoding.lower().endswith('gzip')
- local_md5 = _ValidateDownloadHashes(logger, src_url, src_obj_metadata,
- dst_url, need_to_unzip, server_gzip,
- digesters, hash_algs, api_selector,
- bytes_transferred)
+ local_md5 = _ValidateAndCompleteDownload(
+ logger, src_url, src_obj_metadata, dst_url, need_to_unzip, server_gzip,
+ digesters, hash_algs, download_file_name, api_selector, bytes_transferred)
+
+ with open_files_lock:
+ open_files_map.delete(download_file_name)
return (end_time - start_time, bytes_transferred, dst_url, local_md5)
-def _GetDownloadZipFileName(file_name):
- """Returns the file name for a temporarily compressed downloaded file."""
- return '%s_.gztmp' % file_name
+def _GetDownloadTempZipFileName(dst_url):
+ """Returns temporary file name for a temporarily compressed download."""
+ return '%s_.gztmp' % dst_url.object_name
+
+
+def _GetDownloadTempFileName(dst_url):
+ """Returns temporary download file name for uncompressed downloads."""
+ return '%s_.gstmp' % dst_url.object_name
-def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
- need_to_unzip, server_gzip, digesters, hash_algs,
- api_selector, bytes_transferred):
- """Validates a downloaded file's integrity.
+def _ValidateAndCompleteDownload(logger, src_url, src_obj_metadata, dst_url,
+ need_to_unzip, server_gzip, digesters,
+ hash_algs, download_file_name,
+ api_selector, bytes_transferred):
+ """Validates and performs necessary operations on a downloaded file.
+
+ Validates the integrity of the downloaded file using hash_algs. If the file
+ was compressed (temporarily), the file will be decompressed. Then, if the
+ integrity of the file was successfully validated, the file will be moved
+ from its temporary download location to its permanent location on disk.
Args:
logger: For outputting log messages.
@@ -1880,6 +2372,7 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
hash must be recomputed from the local file.
hash_algs: dict of {string, hash algorithm} that can be used if digesters
don't have up-to-date digests.
+ download_file_name: Temporary file name that was used for download.
api_selector: The Cloud API implementation used (used tracker file naming).
bytes_transferred: Number of bytes downloaded (used for logging).
@@ -1887,10 +2380,10 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
An MD5 of the local file, if one was calculated as part of the integrity
check.
"""
- file_name = dst_url.object_name
- download_file_name = (_GetDownloadZipFileName(file_name) if need_to_unzip else
- file_name)
+ final_file_name = dst_url.object_name
+ file_name = download_file_name
digesters_succeeded = True
+
for alg in digesters:
# If we get a digester with a None algorithm, the underlying
# implementation failed to calculate a digest, so we will need to
@@ -1903,15 +2396,14 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
local_hashes = _CreateDigestsFromDigesters(digesters)
else:
local_hashes = _CreateDigestsFromLocalFile(
- logger, hash_algs, download_file_name, src_obj_metadata)
+ logger, hash_algs, file_name, final_file_name, src_obj_metadata)
digest_verified = True
hash_invalid_exception = None
try:
- _CheckHashes(logger, src_url, src_obj_metadata, download_file_name,
+ _CheckHashes(logger, src_url, src_obj_metadata, final_file_name,
local_hashes)
- DeleteTrackerFile(GetTrackerFilePath(
- dst_url, TrackerFileType.DOWNLOAD, api_selector))
+ DeleteDownloadTrackerFiles(dst_url, api_selector)
except HashMismatchException, e:
# If an non-gzipped object gets sent with gzip content encoding, the hash
# we calculate will match the gzipped bytes, not the original object. Thus,
@@ -1929,34 +2421,27 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
hash_invalid_exception = e
digest_verified = False
else:
- DeleteTrackerFile(GetTrackerFilePath(
- dst_url, TrackerFileType.DOWNLOAD, api_selector))
+ DeleteDownloadTrackerFiles(dst_url, api_selector)
if _RENAME_ON_HASH_MISMATCH:
- os.rename(download_file_name,
- download_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
+ os.rename(file_name,
+ final_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
else:
- os.unlink(download_file_name)
+ os.unlink(file_name)
raise
- if server_gzip and not need_to_unzip:
- # Server compressed bytes on-the-fly, thus we need to rename and decompress.
- # We can't decompress on-the-fly because prior to Python 3.2 the gzip
- # module makes a bunch of seek calls on the stream.
- download_file_name = _GetDownloadZipFileName(file_name)
- os.rename(file_name, download_file_name)
-
if need_to_unzip or server_gzip:
# Log that we're uncompressing if the file is big enough that
# decompressing would make it look like the transfer "stalled" at the end.
if bytes_transferred > TEN_MIB:
logger.info(
- 'Uncompressing downloaded tmp file to %s...', file_name)
+ 'Uncompressing temporarily gzipped file to %s...', final_file_name)
- # Downloaded gzipped file to a filename w/o .gz extension, so unzip.
gzip_fp = None
try:
- gzip_fp = gzip.open(download_file_name, 'rb')
- with open(file_name, 'wb') as f_out:
+ # Downloaded temporarily gzipped file, unzip to file without '_.gztmp'
+ # suffix.
+ gzip_fp = gzip.open(file_name, 'rb')
+ with open(final_file_name, 'wb') as f_out:
data = gzip_fp.read(GZIP_CHUNK_SIZE)
while data:
f_out.write(data)
@@ -1972,19 +2457,19 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
if gzip_fp:
gzip_fp.close()
- os.unlink(download_file_name)
+ os.unlink(file_name)
+ file_name = final_file_name
if not digest_verified:
try:
# Recalculate hashes on the unzipped local file.
- local_hashes = _CreateDigestsFromLocalFile(logger, hash_algs, file_name,
- src_obj_metadata)
- _CheckHashes(logger, src_url, src_obj_metadata, file_name, local_hashes)
- DeleteTrackerFile(GetTrackerFilePath(
- dst_url, TrackerFileType.DOWNLOAD, api_selector))
+ local_hashes = _CreateDigestsFromLocalFile(
+ logger, hash_algs, file_name, final_file_name, src_obj_metadata)
+ _CheckHashes(logger, src_url, src_obj_metadata, final_file_name,
+ local_hashes)
+ DeleteDownloadTrackerFiles(dst_url, api_selector)
except HashMismatchException:
- DeleteTrackerFile(GetTrackerFilePath(
- dst_url, TrackerFileType.DOWNLOAD, api_selector))
+ DeleteDownloadTrackerFiles(dst_url, api_selector)
if _RENAME_ON_HASH_MISMATCH:
os.rename(file_name,
file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
@@ -1992,6 +2477,13 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
os.unlink(file_name)
raise
+ if file_name != final_file_name:
+ # Data is still in a temporary file, so move it to a permanent location.
+ if os.path.exists(final_file_name):
+ os.unlink(final_file_name)
+ os.rename(file_name,
+ final_file_name)
+
if 'md5' in local_hashes:
return local_hashes['md5']
@@ -2123,7 +2615,7 @@ def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url,
# pylint: disable=too-many-statements
def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
copy_exception_handler, allow_splitting=True,
- headers=None, manifest=None, gzip_exts=None, test_method=None):
+ headers=None, manifest=None, gzip_exts=None):
"""Performs copy from src_url to dst_url, handling various special cases.
Args:
@@ -2131,14 +2623,14 @@ def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
src_url: Source StorageUrl.
dst_url: Destination StorageUrl.
gsutil_api: gsutil Cloud API instance to use for the copy.
- command_obj: command object for use in Apply in parallel composite uploads.
+ command_obj: command object for use in Apply in parallel composite uploads
+ and sliced object downloads.
copy_exception_handler: for handling copy exceptions during Apply.
allow_splitting: Whether to allow the file to be split into component
- pieces for an parallel composite upload.
+ pieces for an parallel composite upload or download.
headers: optional headers to use for the copy operation.
manifest: optional manifest for tracking copy operations.
gzip_exts: List of file extensions to gzip for uploads, if any.
- test_method: optional test method for modifying files during unit tests.
Returns:
(elapsed_time, bytes_transferred, version-specific dst_url) excluding
@@ -2194,7 +2686,7 @@ def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
else:
# Just get the fields needed to validate the download.
src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag',
- 'mediaLink', 'md5Hash', 'size']
+ 'mediaLink', 'md5Hash', 'size', 'generation']
if (src_url.scheme == 's3' and
global_copy_helper_opts.skip_unsupported_objects):
@@ -2231,8 +2723,14 @@ def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
try:
src_obj_filestream = GetStreamFromFileUrl(src_url)
except Exception, e: # pylint: disable=broad-except
- raise CommandException('Error opening file "%s": %s.' % (src_url,
- e.message))
+ if command_obj.continue_on_error:
+ message = 'Error copying %s: %s' % (src_url, str(e))
+ command_obj.op_failure_count += 1
+ logger.error(message)
+ return
+ else:
+ raise CommandException('Error opening file "%s": %s.' % (src_url,
+ e.message))
if src_url.IsStream():
src_obj_size = None
else:
@@ -2308,7 +2806,9 @@ def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
if src_url.IsCloudUrl():
if dst_url.IsFileUrl():
return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
- gsutil_api, logger, test_method=test_method)
+ gsutil_api, logger, command_obj,
+ copy_exception_handler,
+ allow_splitting=allow_splitting)
elif copy_in_the_cloud:
return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
dst_obj_metadata, preconditions,
@@ -2499,27 +2999,8 @@ def GetPathBeforeFinalDir(url):
return url.url_string.rstrip(sep).rpartition(sep)[0]
-def _DivideAndCeil(dividend, divisor):
- """Returns ceil(dividend / divisor).
-
- Takes care to avoid the pitfalls of floating point arithmetic that could
- otherwise yield the wrong result for large numbers.
-
- Args:
- dividend: Dividend for the operation.
- divisor: Divisor for the operation.
-
- Returns:
- Quotient.
- """
- quotient = dividend // divisor
- if (dividend % divisor) != 0:
- quotient += 1
- return quotient
-
-
def _GetPartitionInfo(file_size, max_components, default_component_size):
- """Gets info about a file partition for parallel composite uploads.
+ """Gets info about a file partition for parallel file/object transfers.
Args:
file_size: The number of bytes in the file to be partitioned.
@@ -2532,22 +3013,29 @@ def _GetPartitionInfo(file_size, max_components, default_component_size):
file_size != 0 (mod num_components)).
"""
# num_components = ceil(file_size / default_component_size)
- num_components = _DivideAndCeil(file_size, default_component_size)
+ num_components = DivideAndCeil(file_size, default_component_size)
# num_components must be in the range [2, max_components]
num_components = max(min(num_components, max_components), 2)
# component_size = ceil(file_size / num_components)
- component_size = _DivideAndCeil(file_size, num_components)
+ component_size = DivideAndCeil(file_size, num_components)
return (num_components, component_size)
-def _DeleteObjectFn(cls, url_to_delete, thread_state=None):
- """Wrapper function to be used with command.Apply()."""
+def _DeleteTempComponentObjectFn(cls, url_to_delete, thread_state=None):
+ """Wrapper func to be used with command.Apply to delete temporary objects."""
gsutil_api = GetCloudApiInstance(cls, thread_state)
- gsutil_api.DeleteObject(
- url_to_delete.bucket_name, url_to_delete.object_name,
- generation=url_to_delete.generation, provider=url_to_delete.scheme)
+ try:
+ gsutil_api.DeleteObject(
+ url_to_delete.bucket_name, url_to_delete.object_name,
+ generation=url_to_delete.generation, provider=url_to_delete.scheme)
+ except NotFoundException:
+ # The temporary object could already be gone if a retry was
+ # issued at a lower layer but the original request succeeded.
+ # Barring other errors, the top-level command should still report success,
+ # so don't raise here.
+ pass
def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock):
« no previous file with comments | « tools/telemetry/third_party/gsutilz/gslib/commands/web.py ('k') | tools/telemetry/third_party/gsutilz/gslib/gcs_json_api.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698