Index: tools/telemetry/third_party/gsutilz/gslib/copy_helper.py |
diff --git a/tools/telemetry/third_party/gsutilz/gslib/copy_helper.py b/tools/telemetry/third_party/gsutilz/gslib/copy_helper.py |
index 456b15cf47a011e3c5b71a426c46dffeb520a833..aea4195bd028749f71ca361e5fd8a7dda37a9070 100644 |
--- a/tools/telemetry/third_party/gsutilz/gslib/copy_helper.py |
+++ b/tools/telemetry/third_party/gsutilz/gslib/copy_helper.py |
@@ -27,7 +27,7 @@ from hashlib import md5 |
import json |
import logging |
import mimetypes |
-import multiprocessing |
+from operator import attrgetter |
import os |
import pickle |
import random |
@@ -53,10 +53,13 @@ from gslib.cloud_api import ResumableDownloadException |
from gslib.cloud_api import ResumableUploadAbortException |
from gslib.cloud_api import ResumableUploadException |
from gslib.cloud_api import ResumableUploadStartOverException |
-from gslib.cloud_api_helper import GetDownloadSerializationDict |
+from gslib.cloud_api_helper import GetDownloadSerializationData |
from gslib.commands.compose import MAX_COMPOSE_ARITY |
from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE |
from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD |
+from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE |
+from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS |
+from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD |
from gslib.cs_api_map import ApiSelector |
from gslib.daisy_chain_wrapper import DaisyChainWrapper |
from gslib.exception import CommandException |
@@ -65,11 +68,13 @@ from gslib.file_part import FilePart |
from gslib.hashing_helper import Base64EncodeHash |
from gslib.hashing_helper import CalculateB64EncodedMd5FromContents |
from gslib.hashing_helper import CalculateHashesFromContents |
+from gslib.hashing_helper import CHECK_HASH_IF_FAST_ELSE_FAIL |
+from gslib.hashing_helper import CHECK_HASH_NEVER |
+from gslib.hashing_helper import ConcatCrc32c |
from gslib.hashing_helper import GetDownloadHashAlgs |
from gslib.hashing_helper import GetUploadHashAlgs |
from gslib.hashing_helper import HashingFileUploadWrapper |
-from gslib.parallelism_framework_util import ThreadAndProcessSafeDict |
-from gslib.parallelism_framework_util import ThreadSafeDict |
+from gslib.parallelism_framework_util import AtomicDict |
from gslib.progress_callback import ConstructAnnounceText |
from gslib.progress_callback import FileProgressCallbackHandler |
from gslib.progress_callback import ProgressCallbackWithBackoff |
@@ -77,11 +82,13 @@ from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper |
from gslib.storage_url import ContainsWildcard |
from gslib.storage_url import StorageUrlFromString |
from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages |
+from gslib.tracker_file import DeleteDownloadTrackerFiles |
from gslib.tracker_file import DeleteTrackerFile |
from gslib.tracker_file import GetTrackerFilePath |
from gslib.tracker_file import RaiseUnwritableTrackerFileException |
from gslib.tracker_file import ReadOrCreateDownloadTrackerFile |
from gslib.tracker_file import TrackerFileType |
+from gslib.tracker_file import WriteDownloadComponentTrackerFile |
from gslib.translation_helper import AddS3MarkerAclToObjectMetadata |
from gslib.translation_helper import CopyObjectMetadata |
from gslib.translation_helper import DEFAULT_CONTENT_TYPE |
@@ -89,8 +96,11 @@ from gslib.translation_helper import GenerationFromUrlAndString |
from gslib.translation_helper import ObjectMetadataFromHeaders |
from gslib.translation_helper import PreconditionsFromHeaders |
from gslib.translation_helper import S3MarkerAclFromObjectMetadata |
+from gslib.util import CheckFreeSpace |
+from gslib.util import CheckMultiprocessingAvailableAndInit |
from gslib.util import CreateLock |
from gslib.util import DEFAULT_FILE_BUFFER_SIZE |
+from gslib.util import DivideAndCeil |
from gslib.util import GetCloudApiInstance |
from gslib.util import GetFileSize |
from gslib.util import GetJsonResumableChunkSize |
@@ -102,39 +112,34 @@ from gslib.util import IS_WINDOWS |
from gslib.util import IsCloudSubdirPlaceholder |
from gslib.util import MakeHumanReadable |
from gslib.util import MIN_SIZE_COMPUTE_LOGGING |
-from gslib.util import MultiprocessingIsAvailable |
from gslib.util import ResumableThreshold |
from gslib.util import TEN_MIB |
+from gslib.util import UsingCrcmodExtension |
from gslib.util import UTF8 |
from gslib.wildcard_iterator import CreateWildcardIterator |
# pylint: disable=g-import-not-at-top |
if IS_WINDOWS: |
import msvcrt |
- from ctypes import c_int |
- from ctypes import c_uint64 |
- from ctypes import c_char_p |
- from ctypes import c_wchar_p |
- from ctypes import windll |
- from ctypes import POINTER |
- from ctypes import WINFUNCTYPE |
- from ctypes import WinError |
# Declare copy_helper_opts as a global because namedtuple isn't aware of |
# assigning to a class member (which breaks pickling done by multiprocessing). |
# For details see |
# http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instance-correctly |
-# Similarly can't pickle logger. |
# pylint: disable=global-at-module-level |
-global global_copy_helper_opts, global_logger |
+global global_copy_helper_opts |
# In-memory map of local files that are currently opened for write. Used to |
# ensure that if we write to the same file twice (say, for example, because the |
# user specified two identical source URLs), the writes occur serially. |
-global open_files_map |
+global open_files_map, open_files_lock |
open_files_map = ( |
- ThreadSafeDict() if (IS_WINDOWS or not MultiprocessingIsAvailable()[0]) |
- else ThreadAndProcessSafeDict(multiprocessing.Manager())) |
+ AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available |
+ else AtomicDict(manager=gslib.util.manager)) |
+ |
+# We don't allow multiple processes on Windows, so using a process-safe lock |
+# would be unnecessary. |
+open_files_lock = CreateLock() |
# For debugging purposes; if True, files and objects that fail hash validation |
# will be saved with the below suffix appended. |
@@ -178,6 +183,15 @@ PerformParallelUploadFileToObjectArgs = namedtuple( |
'filename file_start file_length src_url dst_url canned_acl ' |
'content_type tracker_file tracker_file_lock') |
+PerformSlicedDownloadObjectToFileArgs = namedtuple( |
+ 'PerformSlicedDownloadObjectToFileArgs', |
+ 'component_num src_url src_obj_metadata dst_url download_file_name ' |
+ 'start_byte end_byte') |
+ |
+PerformSlicedDownloadReturnValues = namedtuple( |
+ 'PerformSlicedDownloadReturnValues', |
+ 'component_num crc32c bytes_transferred server_encoding') |
+ |
ObjectFromTracker = namedtuple('ObjectFromTracker', |
'object_name generation') |
@@ -188,13 +202,23 @@ ObjectFromTracker = namedtuple('ObjectFromTracker', |
# Chunk size to use while zipping/unzipping gzip files. |
GZIP_CHUNK_SIZE = 8192 |
+# Number of bytes to wait before updating a sliced download component tracker |
+# file. |
+TRACKERFILE_UPDATE_THRESHOLD = TEN_MIB |
+ |
PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024 |
# S3 requires special Multipart upload logic (that we currently don't implement) |
# for files > 5GiB in size. |
S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024 |
-suggested_parallel_composites = False |
+# TODO: Create a multiprocessing framework value allocator, then use it instead |
+# of a dict. |
+global suggested_sliced_transfers, suggested_sliced_transfers_lock |
+suggested_sliced_transfers = ( |
+ AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available |
+ else AtomicDict(manager=gslib.util.manager)) |
+suggested_sliced_transfers_lock = CreateLock() |
class FileConcurrencySkipError(Exception): |
@@ -206,7 +230,7 @@ def _RmExceptionHandler(cls, e): |
cls.logger.error(str(e)) |
-def _ParallelUploadCopyExceptionHandler(cls, e): |
+def _ParallelCopyExceptionHandler(cls, e): |
"""Simple exception handler to allow post-completion status.""" |
cls.logger.error(str(e)) |
cls.op_failure_count += 1 |
@@ -248,7 +272,7 @@ def _PerformParallelUploadFileToObject(cls, args, thread_state=None): |
ret = _UploadFileToObject(args.src_url, fp, args.file_length, |
args.dst_url, dst_object_metadata, |
preconditions, gsutil_api, cls.logger, cls, |
- _ParallelUploadCopyExceptionHandler, |
+ _ParallelCopyExceptionHandler, |
gzip_exts=None, allow_splitting=False) |
finally: |
if global_copy_helper_opts.canned_acl: |
@@ -641,23 +665,23 @@ def _CreateDigestsFromDigesters(digesters): |
return digests |
-def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata): |
+def _CreateDigestsFromLocalFile(logger, algs, file_name, final_file_name, |
+ src_obj_metadata): |
"""Creates a base64 CRC32C and/or MD5 digest from file_name. |
Args: |
- logger: for outputting log messages. |
- algs: list of algorithms to compute. |
- file_name: file to digest. |
- src_obj_metadata: metadta of source object. |
+ logger: For outputting log messages. |
+ algs: List of algorithms to compute. |
+ file_name: File to digest. |
+ final_file_name: Permanent location to be used for the downloaded file |
+ after validation (used for logging). |
+ src_obj_metadata: Metadata of source object. |
Returns: |
Dict of algorithm name : base 64 encoded digest |
""" |
hash_dict = {} |
if 'md5' in algs: |
- if src_obj_metadata.size and src_obj_metadata.size > TEN_MIB: |
- logger.info( |
- 'Computing MD5 for %s...', file_name) |
hash_dict['md5'] = md5() |
if 'crc32c' in algs: |
hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c') |
@@ -666,7 +690,8 @@ def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata): |
fp, hash_dict, ProgressCallbackWithBackoff( |
src_obj_metadata.size, |
FileProgressCallbackHandler( |
- ConstructAnnounceText('Hashing', file_name), logger).call)) |
+ ConstructAnnounceText('Hashing', final_file_name), |
+ logger).call)) |
digests = {} |
for alg_name, digest in hash_dict.iteritems(): |
digests[alg_name] = Base64EncodeHash(digest.hexdigest()) |
@@ -730,7 +755,8 @@ def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests, |
logger: for outputting log messages. |
obj_url: CloudUrl for cloud object. |
obj_metadata: Cloud Object being downloaded from or uploaded to. |
- file_name: Local file name on disk being downloaded to or uploaded from. |
+ file_name: Local file name on disk being downloaded to or uploaded from |
+ (used only for logging). |
digests: Computed Digests for the object. |
is_upload: If true, comparing for an uploaded object (controls logging). |
@@ -985,9 +1011,10 @@ def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata, |
# those that were uploaded by a previous, failed run and have since |
# changed (but still have an old generation lying around). |
objects_to_delete = components + existing_objects_to_delete |
- command_obj.Apply(_DeleteObjectFn, objects_to_delete, _RmExceptionHandler, |
- arg_checker=gslib.command.DummyArgChecker, |
- parallel_operations_override=True) |
+ command_obj.Apply( |
+ _DeleteTempComponentObjectFn, objects_to_delete, _RmExceptionHandler, |
+ arg_checker=gslib.command.DummyArgChecker, |
+ parallel_operations_override=True) |
except Exception: # pylint: disable=broad-except |
# If some of the delete calls fail, don't cause the whole command to |
# fail. The copy was successful iff the compose call succeeded, so |
@@ -1025,7 +1052,7 @@ def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url, |
Returns: |
True iff a parallel upload should be performed on the source file. |
""" |
- global suggested_parallel_composites |
+ global suggested_slice_transfers, suggested_sliced_transfers_lock |
parallel_composite_upload_threshold = HumanReadableToBytes(config.get( |
'GSUtil', 'parallel_composite_upload_threshold', |
DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD)) |
@@ -1041,17 +1068,18 @@ def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url, |
# TODO: Once compiled crcmod is being distributed by major Linux distributions |
# remove this check. |
if (all_factors_but_size and parallel_composite_upload_threshold == 0 |
- and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD |
- and not suggested_parallel_composites): |
- logger.info('\n'.join(textwrap.wrap( |
- '==> NOTE: You are uploading one or more large file(s), which would ' |
- 'run significantly faster if you enable parallel composite uploads. ' |
- 'This feature can be enabled by editing the ' |
- '"parallel_composite_upload_threshold" value in your .boto ' |
- 'configuration file. However, note that if you do this you and any ' |
- 'users that download such composite files will need to have a compiled ' |
- 'crcmod installed (see "gsutil help crcmod").')) + '\n') |
- suggested_parallel_composites = True |
+ and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD): |
+ with suggested_sliced_transfers_lock: |
+ if not suggested_sliced_transfers.get('suggested'): |
+ logger.info('\n'.join(textwrap.wrap( |
+ '==> NOTE: You are uploading one or more large file(s), which ' |
+ 'would run significantly faster if you enable parallel composite ' |
+ 'uploads. This feature can be enabled by editing the ' |
+ '"parallel_composite_upload_threshold" value in your .boto ' |
+ 'configuration file. However, note that if you do this you and any ' |
+ 'users that download such composite files will need to have a ' |
+ 'compiled crcmod installed (see "gsutil help crcmod").')) + '\n') |
+ suggested_sliced_transfers['suggested'] = True |
return (all_factors_but_size |
and parallel_composite_upload_threshold > 0 |
@@ -1104,34 +1132,45 @@ def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id, |
if storage_url.IsBucket(): |
return (storage_url, True) |
- # For object/prefix URLs check 3 cases: (a) if the name ends with '/' treat |
- # as a subdir; otherwise, use the wildcard iterator with url to |
- # find if (b) there's a Prefix matching url, or (c) name is of form |
- # dir_$folder$ (and in both these cases also treat dir as a subdir). |
- # Cloud subdirs are always considered to be an existing container. |
- if IsCloudSubdirPlaceholder(storage_url): |
- return (storage_url, True) |
+ # For object/prefix URLs, there are four cases that indicate the destination |
+ # is a cloud subdirectory; these are always considered to be an existing |
+ # container. Checking each case allows gsutil to provide Unix-like |
+ # destination folder semantics, but requires up to three HTTP calls, noted |
+ # below. |
- # Check for the special case where we have a folder marker object. |
- folder_expansion = CreateWildcardIterator( |
- storage_url.versionless_url_string + '_$folder$', gsutil_api, |
- debug=debug, project_id=project_id).IterAll( |
- bucket_listing_fields=['name']) |
- for blr in folder_expansion: |
+ # Case 1: If a placeholder object ending with '/' exists. |
+ if IsCloudSubdirPlaceholder(storage_url): |
return (storage_url, True) |
- blr_expansion = CreateWildcardIterator(url_str, gsutil_api, |
- debug=debug, |
- project_id=project_id).IterAll( |
- bucket_listing_fields=['name']) |
+ # HTTP call to make an eventually consistent check for a matching prefix, |
+ # _$folder$, or empty listing. |
expansion_empty = True |
- for blr in blr_expansion: |
+ list_iterator = gsutil_api.ListObjects( |
+ storage_url.bucket_name, prefix=storage_url.object_name, delimiter='/', |
+ provider=storage_url.scheme, fields=['prefixes', 'items/name']) |
+ for obj_or_prefix in list_iterator: |
+ # To conserve HTTP calls for the common case, we make a single listing |
+ # that covers prefixes and object names. Listing object names covers the |
+ # _$folder$ case and the nonexistent-object-as-subdir case. However, if |
+ # there are many existing objects for which the target URL is an exact |
+ # prefix, this listing could be paginated and span multiple HTTP calls. |
+ # If this case becomes common, we could heurestically abort the |
+ # listing operation after the first page of results and just query for the |
+ # _$folder$ object directly using GetObjectMetadata. |
expansion_empty = False |
- if blr.IsPrefix(): |
+ |
+ if obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.PREFIX: |
+ # Case 2: If there is a matching prefix when listing the destination URL. |
+ return (storage_url, True) |
+ elif (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT and |
+ obj_or_prefix.data.name == storage_url.object_name + '_$folder$'): |
+ # Case 3: If a placeholder object matching destination + _$folder$ |
+ # exists. |
return (storage_url, True) |
- return (storage_url, |
- expansion_empty and treat_nonexistent_object_as_subdir) |
+ # Case 4: If no objects/prefixes matched, and nonexistent objects should be |
+ # treated as subdirectories. |
+ return (storage_url, expansion_empty and treat_nonexistent_object_as_subdir) |
def FixWindowsNaming(src_url, dst_url): |
@@ -1242,46 +1281,6 @@ def _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, |
dst_obj.md5Hash) |
-def _CheckFreeSpace(path): |
- """Return path/drive free space (in bytes).""" |
- if IS_WINDOWS: |
- # pylint: disable=g-import-not-at-top |
- try: |
- # pylint: disable=invalid-name |
- get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p, |
- POINTER(c_uint64), |
- POINTER(c_uint64), |
- POINTER(c_uint64)) |
- get_disk_free_space_ex = get_disk_free_space_ex( |
- ('GetDiskFreeSpaceExW', windll.kernel32), ( |
- (1, 'lpszPathName'), |
- (2, 'lpFreeUserSpace'), |
- (2, 'lpTotalSpace'), |
- (2, 'lpFreeSpace'),)) |
- except AttributeError: |
- get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p, |
- POINTER(c_uint64), |
- POINTER(c_uint64), |
- POINTER(c_uint64)) |
- get_disk_free_space_ex = get_disk_free_space_ex( |
- ('GetDiskFreeSpaceExA', windll.kernel32), ( |
- (1, 'lpszPathName'), |
- (2, 'lpFreeUserSpace'), |
- (2, 'lpTotalSpace'), |
- (2, 'lpFreeSpace'),)) |
- |
- def GetDiskFreeSpaceExErrCheck(result, unused_func, args): |
- if not result: |
- raise WinError() |
- return args[1].value |
- get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck |
- |
- return get_disk_free_space_ex(os.getenv('SystemDrive')) |
- else: |
- (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path) |
- return f_frsize * f_bavail |
- |
- |
def _SetContentTypeFromFile(src_url, dst_obj_metadata): |
"""Detects and sets Content-Type if src_url names a local file. |
@@ -1509,7 +1508,7 @@ def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger): |
# Check for temp space. Assume the compressed object is at most 2x |
# the size of the object (normally should compress to smaller than |
# the object) |
- if _CheckFreeSpace(gzip_path) < 2*int(src_obj_size): |
+ if CheckFreeSpace(gzip_path) < 2*int(src_obj_size): |
raise CommandException('Inadequate temp space available to compress ' |
'%s. See the CHANGING TEMP DIRECTORIES section ' |
'of "gsutil help cp" for more info.' % src_url) |
@@ -1657,30 +1656,26 @@ def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size, |
uploaded_object.md5Hash) |
-# TODO: Refactor this long function into smaller pieces. |
-# pylint: disable=too-many-statements |
-def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
- gsutil_api, logger, test_method=None): |
- """Downloads an object to a local file. |
+def _GetDownloadFile(dst_url, src_obj_metadata, logger): |
+ """Creates a new download file, and deletes the file that will be replaced. |
+ |
+ Names and creates a temporary file for this download. Also, if there is an |
+ existing file at the path where this file will be placed after the download |
+ is completed, that file will be deleted. |
Args: |
- src_url: Source CloudUrl. |
- src_obj_metadata: Metadata from the source object. |
dst_url: Destination FileUrl. |
- gsutil_api: gsutil Cloud API instance to use for the download. |
+ src_obj_metadata: Metadata from the source object. |
logger: for outputting log messages. |
- test_method: Optional test method for modifying the file before validation |
- during unit tests. |
- Returns: |
- (elapsed_time, bytes_transferred, dst_url, md5), excluding overhead like |
- initial GET. |
- Raises: |
- CommandException: if errors encountered. |
+ Returns: |
+ (download_file_name, need_to_unzip) |
+ download_file_name: The name of the temporary file to which the object will |
+ be downloaded. |
+ need_to_unzip: If true, a temporary zip file was used and must be |
+ uncompressed as part of validation. |
""" |
- global open_files_map |
- file_name = dst_url.object_name |
- dir_name = os.path.dirname(file_name) |
+ dir_name = os.path.dirname(dst_url.object_name) |
if dir_name and not os.path.exists(dir_name): |
# Do dir creation in try block so can ignore case where dir already |
# exists. This is needed to avoid a race condition when running gsutil |
@@ -1690,120 +1685,489 @@ def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
except OSError, e: |
if e.errno != errno.EEXIST: |
raise |
- api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) |
+ |
+ need_to_unzip = False |
# For gzipped objects download to a temp file and unzip. For the XML API, |
- # the represents the result of a HEAD request. For the JSON API, this is |
+ # this represents the result of a HEAD request. For the JSON API, this is |
# the stored encoding which the service may not respect. However, if the |
# server sends decompressed bytes for a file that is stored compressed |
# (double compressed case), there is no way we can validate the hash and |
# we will fail our hash check for the object. |
if (src_obj_metadata.contentEncoding and |
src_obj_metadata.contentEncoding.lower().endswith('gzip')): |
- # We can't use tempfile.mkstemp() here because we need a predictable |
- # filename for resumable downloads. |
- download_file_name = _GetDownloadZipFileName(file_name) |
+ need_to_unzip = True |
+ download_file_name = _GetDownloadTempZipFileName(dst_url) |
logger.info( |
'Downloading to temp gzip filename %s', download_file_name) |
- need_to_unzip = True |
else: |
- download_file_name = file_name |
- need_to_unzip = False |
+ download_file_name = _GetDownloadTempFileName(dst_url) |
- if download_file_name.endswith(dst_url.delim): |
- logger.warn('\n'.join(textwrap.wrap( |
- 'Skipping attempt to download to filename ending with slash (%s). This ' |
- 'typically happens when using gsutil to download from a subdirectory ' |
- 'created by the Cloud Console (https://cloud.google.com/console)' |
- % download_file_name))) |
- return (0, 0, dst_url, '') |
+ # If a file exists at the permanent destination (where the file will be moved |
+ # after the download is completed), delete it here to reduce disk space |
+ # requirements. |
+ if os.path.exists(dst_url.object_name): |
+ os.unlink(dst_url.object_name) |
- # Set up hash digesters. |
+ # Downloads open the temporary download file in r+b mode, which requires it |
+ # to already exist, so we create it here if it doesn't exist already. |
+ fp = open(download_file_name, 'ab') |
+ fp.close() |
+ return download_file_name, need_to_unzip |
+ |
+ |
+def _ShouldDoSlicedDownload(download_strategy, src_obj_metadata, |
+ allow_splitting, logger): |
+ """Determines whether the sliced download strategy should be used. |
+ |
+ Args: |
+ download_strategy: CloudApi download strategy. |
+ src_obj_metadata: Metadata from the source object. |
+ allow_splitting: If false, then this function returns false. |
+ logger: logging.Logger for log message output. |
+ |
+ Returns: |
+ True iff a sliced download should be performed on the source file. |
+ """ |
+ sliced_object_download_threshold = HumanReadableToBytes(config.get( |
+ 'GSUtil', 'sliced_object_download_threshold', |
+ DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD)) |
+ |
+ max_components = config.getint( |
+ 'GSUtil', 'sliced_object_download_max_components', |
+ DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS) |
+ |
+ # Don't use sliced download if it will prevent us from performing an |
+ # integrity check. |
+ check_hashes_config = config.get( |
+ 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) |
+ parallel_hashing = src_obj_metadata.crc32c and UsingCrcmodExtension(crcmod) |
+ hashing_okay = parallel_hashing or check_hashes_config == CHECK_HASH_NEVER |
+ |
+ use_slice = ( |
+ allow_splitting |
+ and download_strategy is not CloudApi.DownloadStrategy.ONE_SHOT |
+ and max_components > 1 |
+ and hashing_okay |
+ and sliced_object_download_threshold > 0 |
+ and src_obj_metadata.size >= sliced_object_download_threshold) |
+ |
+ if (not use_slice |
+ and src_obj_metadata.size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD |
+ and not UsingCrcmodExtension(crcmod) |
+ and check_hashes_config != CHECK_HASH_NEVER): |
+ with suggested_sliced_transfers_lock: |
+ if not suggested_sliced_transfers.get('suggested'): |
+ logger.info('\n'.join(textwrap.wrap( |
+ '==> NOTE: You are downloading one or more large file(s), which ' |
+ 'would run significantly faster if you enabled sliced object ' |
+ 'uploads. This feature is enabled by default but requires that ' |
+ 'compiled crcmod be installed (see "gsutil help crcmod").')) + '\n') |
+ suggested_sliced_transfers['suggested'] = True |
+ |
+ return use_slice |
+ |
+ |
+def _PerformSlicedDownloadObjectToFile(cls, args, thread_state=None): |
+ """Function argument to Apply for performing sliced downloads. |
+ |
+ Args: |
+ cls: Calling Command class. |
+ args: PerformSlicedDownloadObjectToFileArgs tuple describing the target. |
+ thread_state: gsutil Cloud API instance to use for the operation. |
+ |
+ Returns: |
+ PerformSlicedDownloadReturnValues named-tuple filled with: |
+ component_num: The component number for this download. |
+ crc32c: CRC32C hash value (integer) of the downloaded bytes. |
+ bytes_transferred: The number of bytes transferred, potentially less |
+ than the component size if the download was resumed. |
+ """ |
+ gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) |
hash_algs = GetDownloadHashAlgs( |
- logger, src_has_md5=src_obj_metadata.md5Hash, |
- src_has_crc32c=src_obj_metadata.crc32c) |
+ cls.logger, consider_crc32c=args.src_obj_metadata.crc32c) |
digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) |
- fp = None |
- # Tracks whether the server used a gzip encoding. |
- server_encoding = None |
- download_complete = False |
- download_strategy = _SelectDownloadStrategy(dst_url) |
- download_start_point = 0 |
- # This is used for resuming downloads, but also for passing the mediaLink |
- # and size into the download for new downloads so that we can avoid |
- # making an extra HTTP call. |
- serialization_data = None |
- serialization_dict = GetDownloadSerializationDict(src_obj_metadata) |
- open_files = [] |
+ (bytes_transferred, server_encoding) = ( |
+ _DownloadObjectToFileResumable(args.src_url, args.src_obj_metadata, |
+ args.dst_url, args.download_file_name, |
+ gsutil_api, cls.logger, digesters, |
+ component_num=args.component_num, |
+ start_byte=args.start_byte, |
+ end_byte=args.end_byte)) |
+ |
+ crc32c_val = None |
+ if 'crc32c' in digesters: |
+ crc32c_val = digesters['crc32c'].crcValue |
+ return PerformSlicedDownloadReturnValues( |
+ args.component_num, crc32c_val, bytes_transferred, server_encoding) |
+ |
+ |
+def _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url, |
+ download_file_name, logger, |
+ api_selector, num_components): |
+ """Maintains sliced download tracker files in order to permit resumability. |
+ |
+ Reads or creates a sliced download tracker file representing this object |
+ download. Upon an attempt at cross-process resumption, the contents of the |
+ sliced download tracker file are verified to make sure a resumption is |
+ possible and appropriate. In the case that a resumption should not be |
+ attempted, existing component tracker files are deleted (to prevent child |
+ processes from attempting resumption), and a new sliced download tracker |
+ file is created. |
+ |
+ Args: |
+ src_obj_metadata: Metadata from the source object. Must include etag and |
+ generation. |
+ dst_url: Destination FileUrl. |
+ download_file_name: Temporary file name to be used for the download. |
+ logger: for outputting log messages. |
+ api_selector: The Cloud API implementation used. |
+ num_components: The number of components to perform this download with. |
+ """ |
+ assert src_obj_metadata.etag |
+ tracker_file = None |
+ |
+ # Only can happen if the resumable threshold is set higher than the |
+ # parallel transfer threshold. |
+ if src_obj_metadata.size < ResumableThreshold(): |
+ return |
+ |
+ tracker_file_name = GetTrackerFilePath(dst_url, |
+ TrackerFileType.SLICED_DOWNLOAD, |
+ api_selector) |
+ |
+ # Check to see if we should attempt resuming the download. |
try: |
- if download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: |
- fp = open(download_file_name, 'wb') |
- elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE: |
- # If this is a resumable download, we need to open the file for append and |
- # manage a tracker file. |
- if open_files_map.get(download_file_name, False): |
- # Ensure another process/thread is not already writing to this file. |
- raise FileConcurrencySkipError |
- open_files.append(download_file_name) |
- open_files_map[download_file_name] = True |
- fp = open(download_file_name, 'ab') |
- |
- resuming = ReadOrCreateDownloadTrackerFile( |
- src_obj_metadata, dst_url, api_selector) |
- if resuming: |
- # Find out how far along we are so we can request the appropriate |
- # remaining range of the object. |
- existing_file_size = GetFileSize(fp, position_to_eof=True) |
- if existing_file_size > src_obj_metadata.size: |
- DeleteTrackerFile(GetTrackerFilePath( |
- dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
- raise CommandException( |
- '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' |
- 'if you re-try this download it will start from scratch' % |
- (download_file_name, existing_file_size, src_url.object_name, |
- src_obj_metadata.size)) |
- else: |
- if existing_file_size == src_obj_metadata.size: |
- logger.info( |
- 'Download already complete for file %s, skipping download but ' |
- 'will run integrity checks.', download_file_name) |
- download_complete = True |
- else: |
- download_start_point = existing_file_size |
- serialization_dict['progress'] = download_start_point |
- logger.info('Resuming download for %s', src_url.url_string) |
- # Catch up our digester with the hash data. |
- if existing_file_size > TEN_MIB: |
- for alg_name in digesters: |
- logger.info( |
- 'Catching up %s for %s', alg_name, download_file_name) |
- with open(download_file_name, 'rb') as hash_fp: |
- while True: |
- data = hash_fp.read(DEFAULT_FILE_BUFFER_SIZE) |
- if not data: |
- break |
- for alg_name in digesters: |
- digesters[alg_name].update(data) |
+ fp = open(download_file_name, 'rb') |
+ existing_file_size = GetFileSize(fp) |
+ # A parallel resumption should be attempted only if the destination file |
+ # size is exactly the same as the source size and the tracker file matches. |
+ if existing_file_size == src_obj_metadata.size: |
+ tracker_file = open(tracker_file_name, 'r') |
+ tracker_file_data = json.load(tracker_file) |
+ if (tracker_file_data['etag'] == src_obj_metadata.etag and |
+ tracker_file_data['generation'] == src_obj_metadata.generation and |
+ tracker_file_data['num_components'] == num_components): |
+ return |
else: |
- # Starting a new download, blow away whatever is already there. |
- fp.truncate(0) |
- fp.seek(0) |
+ tracker_file.close() |
+ logger.warn('Sliced download tracker file doesn\'t match for ' |
+ 'download of %s. Restarting download from scratch.' % |
+ dst_url.object_name) |
+ |
+ except (IOError, ValueError) as e: |
+ # Ignore non-existent file (happens first time a download |
+ # is attempted on an object), but warn user for other errors. |
+ if isinstance(e, ValueError) or e.errno != errno.ENOENT: |
+ logger.warn('Couldn\'t read sliced download tracker file (%s): %s. ' |
+ 'Restarting download from scratch.' % |
+ (tracker_file_name, str(e))) |
+ finally: |
+ if fp: |
+ fp.close() |
+ if tracker_file: |
+ tracker_file.close() |
+ |
+ # Delete component tracker files to guarantee download starts from scratch. |
+ DeleteDownloadTrackerFiles(dst_url, api_selector) |
+ |
+ # Create a new sliced download tracker file to represent this download. |
+ try: |
+ with open(tracker_file_name, 'w') as tracker_file: |
+ tracker_file_data = {'etag': src_obj_metadata.etag, |
+ 'generation': src_obj_metadata.generation, |
+ 'num_components': num_components} |
+ tracker_file.write(json.dumps(tracker_file_data)) |
+ except IOError as e: |
+ RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror) |
+ |
+ |
+class SlicedDownloadFileWrapper(object): |
+ """Wraps a file object to be used in GetObjectMedia for sliced downloads. |
+ In order to allow resumability, the file object used by each thread in a |
+ sliced object download should be wrapped using SlicedDownloadFileWrapper. |
+ Passing a SlicedDownloadFileWrapper object to GetObjectMedia will allow the |
+ download component tracker file for this component to be updated periodically, |
+ while the downloaded bytes are normally written to file. |
+ """ |
+ |
+ def __init__(self, fp, tracker_file_name, src_obj_metadata, start_byte, |
+ end_byte): |
+ """Initializes the SlicedDownloadFileWrapper. |
+ |
+ Args: |
+ fp: The already-open file object to be used for writing in |
+ GetObjectMedia. Data will be written to file starting at the current |
+ seek position. |
+ tracker_file_name: The name of the tracker file for this component. |
+ src_obj_metadata: Metadata from the source object. Must include etag and |
+ generation. |
+ start_byte: The first byte to be downloaded for this parallel component. |
+ end_byte: The last byte to be downloaded for this parallel component. |
+ """ |
+ self._orig_fp = fp |
+ self._tracker_file_name = tracker_file_name |
+ self._src_obj_metadata = src_obj_metadata |
+ self._last_tracker_file_byte = None |
+ self._start_byte = start_byte |
+ self._end_byte = end_byte |
+ |
+ def write(self, data): # pylint: disable=invalid-name |
+ current_file_pos = self._orig_fp.tell() |
+ assert (self._start_byte <= current_file_pos and |
+ current_file_pos + len(data) <= self._end_byte + 1) |
+ |
+ self._orig_fp.write(data) |
+ current_file_pos = self._orig_fp.tell() |
+ |
+ threshold = TRACKERFILE_UPDATE_THRESHOLD |
+ if (self._last_tracker_file_byte is None or |
+ current_file_pos - self._last_tracker_file_byte > threshold or |
+ current_file_pos == self._end_byte + 1): |
+ WriteDownloadComponentTrackerFile( |
+ self._tracker_file_name, self._src_obj_metadata, current_file_pos) |
+ self._last_tracker_file_byte = current_file_pos |
+ |
+ def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name |
+ if whence == os.SEEK_END: |
+ self._orig_fp.seek(offset + self._end_byte + 1) |
else: |
- raise CommandException('Invalid download strategy %s chosen for' |
- 'file %s' % (download_strategy, fp.name)) |
+ self._orig_fp.seek(offset, whence) |
+ assert self._start_byte <= self._orig_fp.tell() <= self._end_byte + 1 |
+ |
+ def tell(self): # pylint: disable=invalid-name |
+ return self._orig_fp.tell() |
+ |
+ def flush(self): # pylint: disable=invalid-name |
+ self._orig_fp.flush() |
+ |
+ def close(self): # pylint: disable=invalid-name |
+ if self._orig_fp: |
+ self._orig_fp.close() |
+ |
+ |
+def _PartitionObject(src_url, src_obj_metadata, dst_url, |
+ download_file_name): |
+ """Partitions an object into components to be downloaded. |
+ |
+ Each component is a byte range of the object. The byte ranges |
+ of the returned components are mutually exclusive and collectively |
+ exhaustive. The byte ranges are inclusive at both end points. |
+ |
+ Args: |
+ src_url: Source CloudUrl. |
+ src_obj_metadata: Metadata from the source object. |
+ dst_url: Destination FileUrl. |
+ download_file_name: Temporary file name to be used for the download. |
+ |
+ Returns: |
+ components_to_download: A list of PerformSlicedDownloadObjectToFileArgs |
+ to be used in Apply for the sliced download. |
+ """ |
+ sliced_download_component_size = HumanReadableToBytes( |
+ config.get('GSUtil', 'sliced_object_download_component_size', |
+ DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE)) |
+ |
+ max_components = config.getint( |
+ 'GSUtil', 'sliced_object_download_max_components', |
+ DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS) |
+ |
+ num_components, component_size = _GetPartitionInfo( |
+ src_obj_metadata.size, max_components, sliced_download_component_size) |
+ |
+ components_to_download = [] |
+ component_lengths = [] |
+ for i in range(num_components): |
+ start_byte = i * component_size |
+ end_byte = min((i + 1) * (component_size) - 1, src_obj_metadata.size - 1) |
+ component_lengths.append(end_byte - start_byte + 1) |
+ components_to_download.append( |
+ PerformSlicedDownloadObjectToFileArgs( |
+ i, src_url, src_obj_metadata, dst_url, download_file_name, |
+ start_byte, end_byte)) |
+ return components_to_download, component_lengths |
+ |
+ |
+def _DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name, |
+ command_obj, logger, copy_exception_handler, |
+ api_selector): |
+ """Downloads a cloud object to a local file using sliced download. |
+ |
+ Byte ranges are decided for each thread/process, and then the parts are |
+ downloaded in parallel. |
+ |
+ Args: |
+ src_url: Source CloudUrl. |
+ src_obj_metadata: Metadata from the source object. |
+ dst_url: Destination FileUrl. |
+ download_file_name: Temporary file name to be used for download. |
+ command_obj: command object for use in Apply in parallel composite uploads. |
+ logger: for outputting log messages. |
+ copy_exception_handler: For handling copy exceptions during Apply. |
+ api_selector: The Cloud API implementation used. |
+ |
+ Returns: |
+ (bytes_transferred, crc32c) |
+ bytes_transferred: Number of bytes transferred from server this call. |
+ crc32c: a crc32c hash value (integer) for the downloaded bytes, or None if |
+ crc32c hashing wasn't performed. |
+ """ |
+ components_to_download, component_lengths = _PartitionObject( |
+ src_url, src_obj_metadata, dst_url, download_file_name) |
+ |
+ num_components = len(components_to_download) |
+ _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url, |
+ download_file_name, logger, |
+ api_selector, num_components) |
+ |
+ # Resize the download file so each child process can seek to its start byte. |
+ with open(download_file_name, 'ab') as fp: |
+ fp.truncate(src_obj_metadata.size) |
+ |
+ cp_results = command_obj.Apply( |
+ _PerformSlicedDownloadObjectToFile, components_to_download, |
+ copy_exception_handler, arg_checker=gslib.command.DummyArgChecker, |
+ parallel_operations_override=True, should_return_results=True) |
+ |
+ if len(cp_results) < num_components: |
+ raise CommandException( |
+ 'Some components of %s were not downloaded successfully. ' |
+ 'Please retry this download.' % dst_url.object_name) |
+ |
+ # Crc32c hashes have to be concatenated in the correct order. |
+ cp_results = sorted(cp_results, key=attrgetter('component_num')) |
+ crc32c = cp_results[0].crc32c |
+ if crc32c is not None: |
+ for i in range(1, num_components): |
+ crc32c = ConcatCrc32c(crc32c, cp_results[i].crc32c, |
+ component_lengths[i]) |
+ |
+ bytes_transferred = 0 |
+ expect_gzip = (src_obj_metadata.contentEncoding and |
+ src_obj_metadata.contentEncoding.lower().endswith('gzip')) |
+ for cp_result in cp_results: |
+ bytes_transferred += cp_result.bytes_transferred |
+ server_gzip = (cp_result.server_encoding and |
+ cp_result.server_encoding.lower().endswith('gzip')) |
+ # If the server gzipped any components on the fly, we will have no chance of |
+ # properly reconstructing the file. |
+ if server_gzip and not expect_gzip: |
+ raise CommandException( |
+ 'Download of %s failed because the server sent back data with an ' |
+ 'unexpected encoding.' % dst_url.object_name) |
+ |
+ return bytes_transferred, crc32c |
+ |
- if not dst_url.IsStream(): |
- serialization_data = json.dumps(serialization_dict) |
+def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url, |
+ download_file_name, gsutil_api, logger, |
+ digesters, component_num=None, start_byte=0, |
+ end_byte=None): |
+ """Downloads an object to a local file using the resumable strategy. |
+ |
+ Args: |
+ src_url: Source CloudUrl. |
+ src_obj_metadata: Metadata from the source object. |
+ dst_url: Destination FileUrl. |
+ download_file_name: Temporary file name to be used for download. |
+ gsutil_api: gsutil Cloud API instance to use for the download. |
+ logger: for outputting log messages. |
+ digesters: Digesters corresponding to the hash algorithms that will be used |
+ for validation. |
+ component_num: Which component of a sliced download this call is for, or |
+ None if this is not a sliced download. |
+ start_byte: The first byte of a byte range for a sliced download. |
+ end_byte: The last byte of a byte range for a sliced download. |
+ |
+ Returns: |
+ (bytes_transferred, server_encoding) |
+ bytes_transferred: Number of bytes transferred from server this call. |
+ server_encoding: Content-encoding string if it was detected that the server |
+ sent encoded bytes during transfer, None otherwise. |
+ """ |
+ if end_byte is None: |
+ end_byte = src_obj_metadata.size - 1 |
+ download_size = end_byte - start_byte + 1 |
+ |
+ is_sliced = component_num is not None |
+ api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) |
+ server_encoding = None |
+ |
+ # Used for logging |
+ download_name = dst_url.object_name |
+ if is_sliced: |
+ download_name += ' component %d' % component_num |
+ |
+ try: |
+ fp = open(download_file_name, 'r+b') |
+ fp.seek(start_byte) |
+ api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) |
+ existing_file_size = GetFileSize(fp) |
+ |
+ tracker_file_name, download_start_byte = ( |
+ ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger, |
+ api_selector, start_byte, |
+ existing_file_size, component_num)) |
+ |
+ if download_start_byte < start_byte or download_start_byte > end_byte + 1: |
+ DeleteTrackerFile(tracker_file_name) |
+ raise CommandException( |
+ 'Resumable download start point for %s is not in the correct byte ' |
+ 'range. Deleting tracker file, so if you re-try this download it ' |
+ 'will start from scratch' % download_name) |
+ |
+ download_complete = (download_start_byte == start_byte + download_size) |
+ resuming = (download_start_byte != start_byte) and not download_complete |
+ if resuming: |
+ logger.info('Resuming download for %s', download_name) |
+ elif download_complete: |
+ logger.info( |
+ 'Download already complete for %s, skipping download but ' |
+ 'will run integrity checks.', download_name) |
+ |
+ # This is used for resuming downloads, but also for passing the mediaLink |
+ # and size into the download for new downloads so that we can avoid |
+ # making an extra HTTP call. |
+ serialization_data = GetDownloadSerializationData( |
+ src_obj_metadata, progress=download_start_byte) |
+ |
+ if resuming or download_complete: |
+ # Catch up our digester with the hash data. |
+ bytes_digested = 0 |
+ total_bytes_to_digest = download_start_byte - start_byte |
+ hash_callback = ProgressCallbackWithBackoff( |
+ total_bytes_to_digest, |
+ FileProgressCallbackHandler( |
+ ConstructAnnounceText('Hashing', |
+ dst_url.url_string), logger).call) |
+ |
+ while bytes_digested < total_bytes_to_digest: |
+ bytes_to_read = min(DEFAULT_FILE_BUFFER_SIZE, |
+ total_bytes_to_digest - bytes_digested) |
+ data = fp.read(bytes_to_read) |
+ bytes_digested += bytes_to_read |
+ for alg_name in digesters: |
+ digesters[alg_name].update(data) |
+ hash_callback.Progress(len(data)) |
+ |
+ elif not is_sliced: |
+ # Delete file contents and start entire object download from scratch. |
+ fp.truncate(0) |
+ existing_file_size = 0 |
progress_callback = FileProgressCallbackHandler( |
- ConstructAnnounceText('Downloading', dst_url.url_string), |
- logger).call |
+ ConstructAnnounceText('Downloading', dst_url.url_string), logger, |
+ start_byte, download_size).call |
+ |
if global_copy_helper_opts.test_callback_file: |
with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: |
progress_callback = pickle.loads(test_fp.read()).call |
- start_time = time.time() |
+ if is_sliced and src_obj_metadata.size >= ResumableThreshold(): |
+ fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata, |
+ start_byte, end_byte) |
+ |
# TODO: With gzip encoding (which may occur on-the-fly and not be part of |
# the object's metadata), when we request a range to resume, it's possible |
# that the server will just resend the entire object, which means our |
@@ -1811,58 +2175,186 @@ def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
# the local file in the case of a failed gzip hash anyway, but it would |
# be better if we actively detected this case. |
if not download_complete: |
+ fp.seek(download_start_byte) |
server_encoding = gsutil_api.GetObjectMedia( |
src_url.bucket_name, src_url.object_name, fp, |
- start_byte=download_start_point, generation=src_url.generation, |
- object_size=src_obj_metadata.size, |
- download_strategy=download_strategy, provider=src_url.scheme, |
- serialization_data=serialization_data, digesters=digesters, |
- progress_callback=progress_callback) |
- |
- end_time = time.time() |
- |
- # If a custom test method is defined, call it here. For the copy command, |
- # test methods are expected to take one argument: an open file pointer, |
- # and are used to perturb the open file during download to exercise |
- # download error detection. |
- if test_method: |
- test_method(fp) |
+ start_byte=download_start_byte, end_byte=end_byte, |
+ generation=src_url.generation, object_size=src_obj_metadata.size, |
+ download_strategy=CloudApi.DownloadStrategy.RESUMABLE, |
+ provider=src_url.scheme, serialization_data=serialization_data, |
+ digesters=digesters, progress_callback=progress_callback) |
except ResumableDownloadException as e: |
- logger.warning('Caught ResumableDownloadException (%s) for file %s.', |
- e.reason, file_name) |
+ logger.warning('Caught ResumableDownloadException (%s) for download of %s.', |
+ e.reason, download_name) |
raise |
+ finally: |
+ if fp: |
+ fp.close() |
+ |
+ bytes_transferred = end_byte - download_start_byte + 1 |
+ return bytes_transferred, server_encoding |
+ |
+def _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url, |
+ download_file_name, gsutil_api, logger, |
+ digesters): |
+ """Downloads an object to a local file using the non-resumable strategy. |
+ |
+ Args: |
+ src_url: Source CloudUrl. |
+ src_obj_metadata: Metadata from the source object. |
+ dst_url: Destination FileUrl. |
+ download_file_name: Temporary file name to be used for download. |
+ gsutil_api: gsutil Cloud API instance to use for the download. |
+ logger: for outputting log messages. |
+ digesters: Digesters corresponding to the hash algorithms that will be used |
+ for validation. |
+ Returns: |
+ (bytes_transferred, server_encoding) |
+ bytes_transferred: Number of bytes transferred from server this call. |
+ server_encoding: Content-encoding string if it was detected that the server |
+ sent encoded bytes during transfer, None otherwise. |
+ """ |
+ try: |
+ fp = open(download_file_name, 'w') |
+ |
+ # This is used to pass the mediaLink and the size into the download so that |
+ # we can avoid making an extra HTTP call. |
+ serialization_data = GetDownloadSerializationData(src_obj_metadata) |
+ |
+ progress_callback = FileProgressCallbackHandler( |
+ ConstructAnnounceText('Downloading', dst_url.url_string), logger).call |
+ |
+ if global_copy_helper_opts.test_callback_file: |
+ with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: |
+ progress_callback = pickle.loads(test_fp.read()).call |
+ |
+ server_encoding = gsutil_api.GetObjectMedia( |
+ src_url.bucket_name, src_url.object_name, fp, |
+ generation=src_url.generation, object_size=src_obj_metadata.size, |
+ download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, |
+ provider=src_url.scheme, serialization_data=serialization_data, |
+ digesters=digesters, progress_callback=progress_callback) |
finally: |
if fp: |
fp.close() |
- for file_name in open_files: |
- open_files_map.delete(file_name) |
- |
- # If we decompressed a content-encoding gzip file on the fly, this may not |
- # be accurate, but it is the best we can do without going deep into the |
- # underlying HTTP libraries. Note that this value is only used for |
- # reporting in log messages; inaccuracy doesn't impact the integrity of the |
- # download. |
- bytes_transferred = src_obj_metadata.size - download_start_point |
+ |
+ return src_obj_metadata.size, server_encoding |
+ |
+ |
+def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
+ gsutil_api, logger, command_obj, |
+ copy_exception_handler, allow_splitting=True): |
+ """Downloads an object to a local file. |
+ |
+ Args: |
+ src_url: Source CloudUrl. |
+ src_obj_metadata: Metadata from the source object. |
+ dst_url: Destination FileUrl. |
+ gsutil_api: gsutil Cloud API instance to use for the download. |
+ logger: for outputting log messages. |
+ command_obj: command object for use in Apply in sliced downloads. |
+ copy_exception_handler: For handling copy exceptions during Apply. |
+ allow_splitting: Whether or not to allow sliced download. |
+ Returns: |
+ (elapsed_time, bytes_transferred, dst_url, md5), where time elapsed |
+ excludes initial GET. |
+ |
+ Raises: |
+ FileConcurrencySkipError: if this download is already in progress. |
+ CommandException: if other errors encountered. |
+ """ |
+ global open_files_map, open_files_lock |
+ if dst_url.object_name.endswith(dst_url.delim): |
+ logger.warn('\n'.join(textwrap.wrap( |
+ 'Skipping attempt to download to filename ending with slash (%s). This ' |
+ 'typically happens when using gsutil to download from a subdirectory ' |
+ 'created by the Cloud Console (https://cloud.google.com/console)' |
+ % dst_url.object_name))) |
+ return (0, 0, dst_url, '') |
+ |
+ api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) |
+ download_strategy = _SelectDownloadStrategy(dst_url) |
+ sliced_download = _ShouldDoSlicedDownload( |
+ download_strategy, src_obj_metadata, allow_splitting, logger) |
+ |
+ download_file_name, need_to_unzip = _GetDownloadFile( |
+ dst_url, src_obj_metadata, logger) |
+ |
+ # Ensure another process/thread is not already writing to this file. |
+ with open_files_lock: |
+ if open_files_map.get(download_file_name, False): |
+ raise FileConcurrencySkipError |
+ open_files_map[download_file_name] = True |
+ |
+ # Set up hash digesters. |
+ consider_md5 = src_obj_metadata.md5Hash and not sliced_download |
+ hash_algs = GetDownloadHashAlgs(logger, consider_md5=consider_md5, |
+ consider_crc32c=src_obj_metadata.crc32c) |
+ digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) |
+ |
+ # Tracks whether the server used a gzip encoding. |
+ server_encoding = None |
+ download_complete = (src_obj_metadata.size == 0) |
+ bytes_transferred = 0 |
+ |
+ start_time = time.time() |
+ if not download_complete: |
+ if sliced_download: |
+ (bytes_transferred, crc32c) = ( |
+ _DoSlicedDownload(src_url, src_obj_metadata, dst_url, |
+ download_file_name, command_obj, logger, |
+ copy_exception_handler, api_selector)) |
+ if 'crc32c' in digesters: |
+ digesters['crc32c'].crcValue = crc32c |
+ elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: |
+ (bytes_transferred, server_encoding) = ( |
+ _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url, |
+ download_file_name, gsutil_api, |
+ logger, digesters)) |
+ elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE: |
+ (bytes_transferred, server_encoding) = ( |
+ _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url, |
+ download_file_name, gsutil_api, logger, |
+ digesters)) |
+ else: |
+ raise CommandException('Invalid download strategy %s chosen for' |
+ 'file %s' % (download_strategy, |
+ download_file_name)) |
+ end_time = time.time() |
+ |
server_gzip = server_encoding and server_encoding.lower().endswith('gzip') |
- local_md5 = _ValidateDownloadHashes(logger, src_url, src_obj_metadata, |
- dst_url, need_to_unzip, server_gzip, |
- digesters, hash_algs, api_selector, |
- bytes_transferred) |
+ local_md5 = _ValidateAndCompleteDownload( |
+ logger, src_url, src_obj_metadata, dst_url, need_to_unzip, server_gzip, |
+ digesters, hash_algs, download_file_name, api_selector, bytes_transferred) |
+ |
+ with open_files_lock: |
+ open_files_map.delete(download_file_name) |
return (end_time - start_time, bytes_transferred, dst_url, local_md5) |
-def _GetDownloadZipFileName(file_name): |
- """Returns the file name for a temporarily compressed downloaded file.""" |
- return '%s_.gztmp' % file_name |
+def _GetDownloadTempZipFileName(dst_url): |
+ """Returns temporary file name for a temporarily compressed download.""" |
+ return '%s_.gztmp' % dst_url.object_name |
+ |
+ |
+def _GetDownloadTempFileName(dst_url): |
+ """Returns temporary download file name for uncompressed downloads.""" |
+ return '%s_.gstmp' % dst_url.object_name |
-def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, |
- need_to_unzip, server_gzip, digesters, hash_algs, |
- api_selector, bytes_transferred): |
- """Validates a downloaded file's integrity. |
+def _ValidateAndCompleteDownload(logger, src_url, src_obj_metadata, dst_url, |
+ need_to_unzip, server_gzip, digesters, |
+ hash_algs, download_file_name, |
+ api_selector, bytes_transferred): |
+ """Validates and performs necessary operations on a downloaded file. |
+ |
+ Validates the integrity of the downloaded file using hash_algs. If the file |
+ was compressed (temporarily), the file will be decompressed. Then, if the |
+ integrity of the file was successfully validated, the file will be moved |
+ from its temporary download location to its permanent location on disk. |
Args: |
logger: For outputting log messages. |
@@ -1880,6 +2372,7 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, |
hash must be recomputed from the local file. |
hash_algs: dict of {string, hash algorithm} that can be used if digesters |
don't have up-to-date digests. |
+ download_file_name: Temporary file name that was used for download. |
api_selector: The Cloud API implementation used (used tracker file naming). |
bytes_transferred: Number of bytes downloaded (used for logging). |
@@ -1887,10 +2380,10 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, |
An MD5 of the local file, if one was calculated as part of the integrity |
check. |
""" |
- file_name = dst_url.object_name |
- download_file_name = (_GetDownloadZipFileName(file_name) if need_to_unzip else |
- file_name) |
+ final_file_name = dst_url.object_name |
+ file_name = download_file_name |
digesters_succeeded = True |
+ |
for alg in digesters: |
# If we get a digester with a None algorithm, the underlying |
# implementation failed to calculate a digest, so we will need to |
@@ -1903,15 +2396,14 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, |
local_hashes = _CreateDigestsFromDigesters(digesters) |
else: |
local_hashes = _CreateDigestsFromLocalFile( |
- logger, hash_algs, download_file_name, src_obj_metadata) |
+ logger, hash_algs, file_name, final_file_name, src_obj_metadata) |
digest_verified = True |
hash_invalid_exception = None |
try: |
- _CheckHashes(logger, src_url, src_obj_metadata, download_file_name, |
+ _CheckHashes(logger, src_url, src_obj_metadata, final_file_name, |
local_hashes) |
- DeleteTrackerFile(GetTrackerFilePath( |
- dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
+ DeleteDownloadTrackerFiles(dst_url, api_selector) |
except HashMismatchException, e: |
# If an non-gzipped object gets sent with gzip content encoding, the hash |
# we calculate will match the gzipped bytes, not the original object. Thus, |
@@ -1929,34 +2421,27 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, |
hash_invalid_exception = e |
digest_verified = False |
else: |
- DeleteTrackerFile(GetTrackerFilePath( |
- dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
+ DeleteDownloadTrackerFiles(dst_url, api_selector) |
if _RENAME_ON_HASH_MISMATCH: |
- os.rename(download_file_name, |
- download_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) |
+ os.rename(file_name, |
+ final_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) |
else: |
- os.unlink(download_file_name) |
+ os.unlink(file_name) |
raise |
- if server_gzip and not need_to_unzip: |
- # Server compressed bytes on-the-fly, thus we need to rename and decompress. |
- # We can't decompress on-the-fly because prior to Python 3.2 the gzip |
- # module makes a bunch of seek calls on the stream. |
- download_file_name = _GetDownloadZipFileName(file_name) |
- os.rename(file_name, download_file_name) |
- |
if need_to_unzip or server_gzip: |
# Log that we're uncompressing if the file is big enough that |
# decompressing would make it look like the transfer "stalled" at the end. |
if bytes_transferred > TEN_MIB: |
logger.info( |
- 'Uncompressing downloaded tmp file to %s...', file_name) |
+ 'Uncompressing temporarily gzipped file to %s...', final_file_name) |
- # Downloaded gzipped file to a filename w/o .gz extension, so unzip. |
gzip_fp = None |
try: |
- gzip_fp = gzip.open(download_file_name, 'rb') |
- with open(file_name, 'wb') as f_out: |
+ # Downloaded temporarily gzipped file, unzip to file without '_.gztmp' |
+ # suffix. |
+ gzip_fp = gzip.open(file_name, 'rb') |
+ with open(final_file_name, 'wb') as f_out: |
data = gzip_fp.read(GZIP_CHUNK_SIZE) |
while data: |
f_out.write(data) |
@@ -1972,19 +2457,19 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, |
if gzip_fp: |
gzip_fp.close() |
- os.unlink(download_file_name) |
+ os.unlink(file_name) |
+ file_name = final_file_name |
if not digest_verified: |
try: |
# Recalculate hashes on the unzipped local file. |
- local_hashes = _CreateDigestsFromLocalFile(logger, hash_algs, file_name, |
- src_obj_metadata) |
- _CheckHashes(logger, src_url, src_obj_metadata, file_name, local_hashes) |
- DeleteTrackerFile(GetTrackerFilePath( |
- dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
+ local_hashes = _CreateDigestsFromLocalFile( |
+ logger, hash_algs, file_name, final_file_name, src_obj_metadata) |
+ _CheckHashes(logger, src_url, src_obj_metadata, final_file_name, |
+ local_hashes) |
+ DeleteDownloadTrackerFiles(dst_url, api_selector) |
except HashMismatchException: |
- DeleteTrackerFile(GetTrackerFilePath( |
- dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
+ DeleteDownloadTrackerFiles(dst_url, api_selector) |
if _RENAME_ON_HASH_MISMATCH: |
os.rename(file_name, |
file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) |
@@ -1992,6 +2477,13 @@ def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, |
os.unlink(file_name) |
raise |
+ if file_name != final_file_name: |
+ # Data is still in a temporary file, so move it to a permanent location. |
+ if os.path.exists(final_file_name): |
+ os.unlink(final_file_name) |
+ os.rename(file_name, |
+ final_file_name) |
+ |
if 'md5' in local_hashes: |
return local_hashes['md5'] |
@@ -2123,7 +2615,7 @@ def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url, |
# pylint: disable=too-many-statements |
def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, |
copy_exception_handler, allow_splitting=True, |
- headers=None, manifest=None, gzip_exts=None, test_method=None): |
+ headers=None, manifest=None, gzip_exts=None): |
"""Performs copy from src_url to dst_url, handling various special cases. |
Args: |
@@ -2131,14 +2623,14 @@ def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, |
src_url: Source StorageUrl. |
dst_url: Destination StorageUrl. |
gsutil_api: gsutil Cloud API instance to use for the copy. |
- command_obj: command object for use in Apply in parallel composite uploads. |
+ command_obj: command object for use in Apply in parallel composite uploads |
+ and sliced object downloads. |
copy_exception_handler: for handling copy exceptions during Apply. |
allow_splitting: Whether to allow the file to be split into component |
- pieces for an parallel composite upload. |
+ pieces for an parallel composite upload or download. |
headers: optional headers to use for the copy operation. |
manifest: optional manifest for tracking copy operations. |
gzip_exts: List of file extensions to gzip for uploads, if any. |
- test_method: optional test method for modifying files during unit tests. |
Returns: |
(elapsed_time, bytes_transferred, version-specific dst_url) excluding |
@@ -2194,7 +2686,7 @@ def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, |
else: |
# Just get the fields needed to validate the download. |
src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag', |
- 'mediaLink', 'md5Hash', 'size'] |
+ 'mediaLink', 'md5Hash', 'size', 'generation'] |
if (src_url.scheme == 's3' and |
global_copy_helper_opts.skip_unsupported_objects): |
@@ -2231,8 +2723,14 @@ def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, |
try: |
src_obj_filestream = GetStreamFromFileUrl(src_url) |
except Exception, e: # pylint: disable=broad-except |
- raise CommandException('Error opening file "%s": %s.' % (src_url, |
- e.message)) |
+ if command_obj.continue_on_error: |
+ message = 'Error copying %s: %s' % (src_url, str(e)) |
+ command_obj.op_failure_count += 1 |
+ logger.error(message) |
+ return |
+ else: |
+ raise CommandException('Error opening file "%s": %s.' % (src_url, |
+ e.message)) |
if src_url.IsStream(): |
src_obj_size = None |
else: |
@@ -2308,7 +2806,9 @@ def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, |
if src_url.IsCloudUrl(): |
if dst_url.IsFileUrl(): |
return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
- gsutil_api, logger, test_method=test_method) |
+ gsutil_api, logger, command_obj, |
+ copy_exception_handler, |
+ allow_splitting=allow_splitting) |
elif copy_in_the_cloud: |
return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, |
dst_obj_metadata, preconditions, |
@@ -2499,27 +2999,8 @@ def GetPathBeforeFinalDir(url): |
return url.url_string.rstrip(sep).rpartition(sep)[0] |
-def _DivideAndCeil(dividend, divisor): |
- """Returns ceil(dividend / divisor). |
- |
- Takes care to avoid the pitfalls of floating point arithmetic that could |
- otherwise yield the wrong result for large numbers. |
- |
- Args: |
- dividend: Dividend for the operation. |
- divisor: Divisor for the operation. |
- |
- Returns: |
- Quotient. |
- """ |
- quotient = dividend // divisor |
- if (dividend % divisor) != 0: |
- quotient += 1 |
- return quotient |
- |
- |
def _GetPartitionInfo(file_size, max_components, default_component_size): |
- """Gets info about a file partition for parallel composite uploads. |
+ """Gets info about a file partition for parallel file/object transfers. |
Args: |
file_size: The number of bytes in the file to be partitioned. |
@@ -2532,22 +3013,29 @@ def _GetPartitionInfo(file_size, max_components, default_component_size): |
file_size != 0 (mod num_components)). |
""" |
# num_components = ceil(file_size / default_component_size) |
- num_components = _DivideAndCeil(file_size, default_component_size) |
+ num_components = DivideAndCeil(file_size, default_component_size) |
# num_components must be in the range [2, max_components] |
num_components = max(min(num_components, max_components), 2) |
# component_size = ceil(file_size / num_components) |
- component_size = _DivideAndCeil(file_size, num_components) |
+ component_size = DivideAndCeil(file_size, num_components) |
return (num_components, component_size) |
-def _DeleteObjectFn(cls, url_to_delete, thread_state=None): |
- """Wrapper function to be used with command.Apply().""" |
+def _DeleteTempComponentObjectFn(cls, url_to_delete, thread_state=None): |
+ """Wrapper func to be used with command.Apply to delete temporary objects.""" |
gsutil_api = GetCloudApiInstance(cls, thread_state) |
- gsutil_api.DeleteObject( |
- url_to_delete.bucket_name, url_to_delete.object_name, |
- generation=url_to_delete.generation, provider=url_to_delete.scheme) |
+ try: |
+ gsutil_api.DeleteObject( |
+ url_to_delete.bucket_name, url_to_delete.object_name, |
+ generation=url_to_delete.generation, provider=url_to_delete.scheme) |
+ except NotFoundException: |
+ # The temporary object could already be gone if a retry was |
+ # issued at a lower layer but the original request succeeded. |
+ # Barring other errors, the top-level command should still report success, |
+ # so don't raise here. |
+ pass |
def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock): |