| Index: tools/telemetry/third_party/gsutilz/gslib/copy_helper.py
|
| diff --git a/tools/telemetry/third_party/gsutilz/gslib/copy_helper.py b/tools/telemetry/third_party/gsutilz/gslib/copy_helper.py
|
| deleted file mode 100644
|
| index 456b15cf47a011e3c5b71a426c46dffeb520a833..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/third_party/gsutilz/gslib/copy_helper.py
|
| +++ /dev/null
|
| @@ -1,2787 +0,0 @@
|
| -# -*- coding: utf-8 -*-
|
| -# Copyright 2011 Google Inc. All Rights Reserved.
|
| -# Copyright 2011, Nexenta Systems Inc.
|
| -#
|
| -# Licensed under the Apache License, Version 2.0 (the "License");
|
| -# you may not use this file except in compliance with the License.
|
| -# You may obtain a copy of the License at
|
| -#
|
| -# http://www.apache.org/licenses/LICENSE-2.0
|
| -#
|
| -# Unless required by applicable law or agreed to in writing, software
|
| -# distributed under the License is distributed on an "AS IS" BASIS,
|
| -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| -# See the License for the specific language governing permissions and
|
| -# limitations under the License.
|
| -"""Helper functions for copy functionality."""
|
| -
|
| -from __future__ import absolute_import
|
| -
|
| -import base64
|
| -from collections import namedtuple
|
| -import csv
|
| -import datetime
|
| -import errno
|
| -import gzip
|
| -from hashlib import md5
|
| -import json
|
| -import logging
|
| -import mimetypes
|
| -import multiprocessing
|
| -import os
|
| -import pickle
|
| -import random
|
| -import re
|
| -import shutil
|
| -import stat
|
| -import subprocess
|
| -import tempfile
|
| -import textwrap
|
| -import time
|
| -import traceback
|
| -
|
| -from boto import config
|
| -import crcmod
|
| -
|
| -import gslib
|
| -from gslib.cloud_api import ArgumentException
|
| -from gslib.cloud_api import CloudApi
|
| -from gslib.cloud_api import NotFoundException
|
| -from gslib.cloud_api import PreconditionException
|
| -from gslib.cloud_api import Preconditions
|
| -from gslib.cloud_api import ResumableDownloadException
|
| -from gslib.cloud_api import ResumableUploadAbortException
|
| -from gslib.cloud_api import ResumableUploadException
|
| -from gslib.cloud_api import ResumableUploadStartOverException
|
| -from gslib.cloud_api_helper import GetDownloadSerializationDict
|
| -from gslib.commands.compose import MAX_COMPOSE_ARITY
|
| -from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE
|
| -from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD
|
| -from gslib.cs_api_map import ApiSelector
|
| -from gslib.daisy_chain_wrapper import DaisyChainWrapper
|
| -from gslib.exception import CommandException
|
| -from gslib.exception import HashMismatchException
|
| -from gslib.file_part import FilePart
|
| -from gslib.hashing_helper import Base64EncodeHash
|
| -from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
|
| -from gslib.hashing_helper import CalculateHashesFromContents
|
| -from gslib.hashing_helper import GetDownloadHashAlgs
|
| -from gslib.hashing_helper import GetUploadHashAlgs
|
| -from gslib.hashing_helper import HashingFileUploadWrapper
|
| -from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
|
| -from gslib.parallelism_framework_util import ThreadSafeDict
|
| -from gslib.progress_callback import ConstructAnnounceText
|
| -from gslib.progress_callback import FileProgressCallbackHandler
|
| -from gslib.progress_callback import ProgressCallbackWithBackoff
|
| -from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper
|
| -from gslib.storage_url import ContainsWildcard
|
| -from gslib.storage_url import StorageUrlFromString
|
| -from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
|
| -from gslib.tracker_file import DeleteTrackerFile
|
| -from gslib.tracker_file import GetTrackerFilePath
|
| -from gslib.tracker_file import RaiseUnwritableTrackerFileException
|
| -from gslib.tracker_file import ReadOrCreateDownloadTrackerFile
|
| -from gslib.tracker_file import TrackerFileType
|
| -from gslib.translation_helper import AddS3MarkerAclToObjectMetadata
|
| -from gslib.translation_helper import CopyObjectMetadata
|
| -from gslib.translation_helper import DEFAULT_CONTENT_TYPE
|
| -from gslib.translation_helper import GenerationFromUrlAndString
|
| -from gslib.translation_helper import ObjectMetadataFromHeaders
|
| -from gslib.translation_helper import PreconditionsFromHeaders
|
| -from gslib.translation_helper import S3MarkerAclFromObjectMetadata
|
| -from gslib.util import CreateLock
|
| -from gslib.util import DEFAULT_FILE_BUFFER_SIZE
|
| -from gslib.util import GetCloudApiInstance
|
| -from gslib.util import GetFileSize
|
| -from gslib.util import GetJsonResumableChunkSize
|
| -from gslib.util import GetMaxRetryDelay
|
| -from gslib.util import GetNumRetries
|
| -from gslib.util import GetStreamFromFileUrl
|
| -from gslib.util import HumanReadableToBytes
|
| -from gslib.util import IS_WINDOWS
|
| -from gslib.util import IsCloudSubdirPlaceholder
|
| -from gslib.util import MakeHumanReadable
|
| -from gslib.util import MIN_SIZE_COMPUTE_LOGGING
|
| -from gslib.util import MultiprocessingIsAvailable
|
| -from gslib.util import ResumableThreshold
|
| -from gslib.util import TEN_MIB
|
| -from gslib.util import UTF8
|
| -from gslib.wildcard_iterator import CreateWildcardIterator
|
| -
|
| -# pylint: disable=g-import-not-at-top
|
| -if IS_WINDOWS:
|
| - import msvcrt
|
| - from ctypes import c_int
|
| - from ctypes import c_uint64
|
| - from ctypes import c_char_p
|
| - from ctypes import c_wchar_p
|
| - from ctypes import windll
|
| - from ctypes import POINTER
|
| - from ctypes import WINFUNCTYPE
|
| - from ctypes import WinError
|
| -
|
| -# Declare copy_helper_opts as a global because namedtuple isn't aware of
|
| -# assigning to a class member (which breaks pickling done by multiprocessing).
|
| -# For details see
|
| -# http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instance-correctly
|
| -# Similarly can't pickle logger.
|
| -# pylint: disable=global-at-module-level
|
| -global global_copy_helper_opts, global_logger
|
| -
|
| -# In-memory map of local files that are currently opened for write. Used to
|
| -# ensure that if we write to the same file twice (say, for example, because the
|
| -# user specified two identical source URLs), the writes occur serially.
|
| -global open_files_map
|
| -open_files_map = (
|
| - ThreadSafeDict() if (IS_WINDOWS or not MultiprocessingIsAvailable()[0])
|
| - else ThreadAndProcessSafeDict(multiprocessing.Manager()))
|
| -
|
| -# For debugging purposes; if True, files and objects that fail hash validation
|
| -# will be saved with the below suffix appended.
|
| -_RENAME_ON_HASH_MISMATCH = False
|
| -_RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt'
|
| -
|
| -PARALLEL_UPLOAD_TEMP_NAMESPACE = (
|
| - u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/')
|
| -
|
| -PARALLEL_UPLOAD_STATIC_SALT = u"""
|
| -PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS.
|
| -The theory is that no user will have prepended this to the front of
|
| -one of their object names and then done an MD5 hash of the name, and
|
| -then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object
|
| -name. Note that there will be no problems with object name length since we
|
| -hash the original name.
|
| -"""
|
| -
|
| -# When uploading a file, get the following fields in the response for
|
| -# filling in command output and manifests.
|
| -UPLOAD_RETURN_FIELDS = ['crc32c', 'etag', 'generation', 'md5Hash', 'size']
|
| -
|
| -# This tuple is used only to encapsulate the arguments needed for
|
| -# command.Apply() in the parallel composite upload case.
|
| -# Note that content_type is used instead of a full apitools Object() because
|
| -# apitools objects are not picklable.
|
| -# filename: String name of file.
|
| -# file_start: start byte of file (may be in the middle of a file for partitioned
|
| -# files).
|
| -# file_length: length of upload (may not be the entire length of a file for
|
| -# partitioned files).
|
| -# src_url: FileUrl describing the source file.
|
| -# dst_url: CloudUrl describing the destination component file.
|
| -# canned_acl: canned_acl to apply to the uploaded file/component.
|
| -# content_type: content-type for final object, used for setting content-type
|
| -# of components and final object.
|
| -# tracker_file: tracker file for this component.
|
| -# tracker_file_lock: tracker file lock for tracker file(s).
|
| -PerformParallelUploadFileToObjectArgs = namedtuple(
|
| - 'PerformParallelUploadFileToObjectArgs',
|
| - 'filename file_start file_length src_url dst_url canned_acl '
|
| - 'content_type tracker_file tracker_file_lock')
|
| -
|
| -ObjectFromTracker = namedtuple('ObjectFromTracker',
|
| - 'object_name generation')
|
| -
|
| -# TODO: Refactor this file to be less cumbersome. In particular, some of the
|
| -# different paths (e.g., uploading a file to an object vs. downloading an
|
| -# object to a file) could be split into separate files.
|
| -
|
| -# Chunk size to use while zipping/unzipping gzip files.
|
| -GZIP_CHUNK_SIZE = 8192
|
| -
|
| -PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024
|
| -
|
| -# S3 requires special Multipart upload logic (that we currently don't implement)
|
| -# for files > 5GiB in size.
|
| -S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024
|
| -
|
| -suggested_parallel_composites = False
|
| -
|
| -
|
| -class FileConcurrencySkipError(Exception):
|
| - """Raised when skipping a file due to a concurrent, duplicate copy."""
|
| -
|
| -
|
| -def _RmExceptionHandler(cls, e):
|
| - """Simple exception handler to allow post-completion status."""
|
| - cls.logger.error(str(e))
|
| -
|
| -
|
| -def _ParallelUploadCopyExceptionHandler(cls, e):
|
| - """Simple exception handler to allow post-completion status."""
|
| - cls.logger.error(str(e))
|
| - cls.op_failure_count += 1
|
| - cls.logger.debug('\n\nEncountered exception while copying:\n%s\n',
|
| - traceback.format_exc())
|
| -
|
| -
|
| -def _PerformParallelUploadFileToObject(cls, args, thread_state=None):
|
| - """Function argument to Apply for performing parallel composite uploads.
|
| -
|
| - Args:
|
| - cls: Calling Command class.
|
| - args: PerformParallelUploadFileToObjectArgs tuple describing the target.
|
| - thread_state: gsutil Cloud API instance to use for the operation.
|
| -
|
| - Returns:
|
| - StorageUrl representing a successfully uploaded component.
|
| - """
|
| - fp = FilePart(args.filename, args.file_start, args.file_length)
|
| - gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
|
| - with fp:
|
| - # We take many precautions with the component names that make collisions
|
| - # effectively impossible. Specifying preconditions will just allow us to
|
| - # reach a state in which uploads will always fail on retries.
|
| - preconditions = None
|
| -
|
| - # Fill in content type if one was provided.
|
| - dst_object_metadata = apitools_messages.Object(
|
| - name=args.dst_url.object_name,
|
| - bucket=args.dst_url.bucket_name,
|
| - contentType=args.content_type)
|
| -
|
| - try:
|
| - if global_copy_helper_opts.canned_acl:
|
| - # No canned ACL support in JSON, force XML API to be used for
|
| - # upload/copy operations.
|
| - orig_prefer_api = gsutil_api.prefer_api
|
| - gsutil_api.prefer_api = ApiSelector.XML
|
| - ret = _UploadFileToObject(args.src_url, fp, args.file_length,
|
| - args.dst_url, dst_object_metadata,
|
| - preconditions, gsutil_api, cls.logger, cls,
|
| - _ParallelUploadCopyExceptionHandler,
|
| - gzip_exts=None, allow_splitting=False)
|
| - finally:
|
| - if global_copy_helper_opts.canned_acl:
|
| - gsutil_api.prefer_api = orig_prefer_api
|
| -
|
| - component = ret[2]
|
| - _AppendComponentTrackerToParallelUploadTrackerFile(
|
| - args.tracker_file, component, args.tracker_file_lock)
|
| - return ret
|
| -
|
| -
|
| -CopyHelperOpts = namedtuple('CopyHelperOpts', [
|
| - 'perform_mv',
|
| - 'no_clobber',
|
| - 'daisy_chain',
|
| - 'read_args_from_stdin',
|
| - 'print_ver',
|
| - 'use_manifest',
|
| - 'preserve_acl',
|
| - 'canned_acl',
|
| - 'skip_unsupported_objects',
|
| - 'test_callback_file'])
|
| -
|
| -
|
| -# pylint: disable=global-variable-undefined
|
| -def CreateCopyHelperOpts(perform_mv=False, no_clobber=False, daisy_chain=False,
|
| - read_args_from_stdin=False, print_ver=False,
|
| - use_manifest=False, preserve_acl=False,
|
| - canned_acl=None, skip_unsupported_objects=False,
|
| - test_callback_file=None):
|
| - """Creates CopyHelperOpts for passing options to CopyHelper."""
|
| - # We create a tuple with union of options needed by CopyHelper and any
|
| - # copy-related functionality in CpCommand, RsyncCommand, or Command class.
|
| - global global_copy_helper_opts
|
| - global_copy_helper_opts = CopyHelperOpts(
|
| - perform_mv=perform_mv,
|
| - no_clobber=no_clobber,
|
| - daisy_chain=daisy_chain,
|
| - read_args_from_stdin=read_args_from_stdin,
|
| - print_ver=print_ver,
|
| - use_manifest=use_manifest,
|
| - preserve_acl=preserve_acl,
|
| - canned_acl=canned_acl,
|
| - skip_unsupported_objects=skip_unsupported_objects,
|
| - test_callback_file=test_callback_file)
|
| - return global_copy_helper_opts
|
| -
|
| -
|
| -# pylint: disable=global-variable-undefined
|
| -# pylint: disable=global-variable-not-assigned
|
| -def GetCopyHelperOpts():
|
| - """Returns namedtuple holding CopyHelper options."""
|
| - global global_copy_helper_opts
|
| - return global_copy_helper_opts
|
| -
|
| -
|
| -def _SelectDownloadStrategy(dst_url):
|
| - """Get download strategy based on the destination object.
|
| -
|
| - Args:
|
| - dst_url: Destination StorageUrl.
|
| -
|
| - Returns:
|
| - gsutil Cloud API DownloadStrategy.
|
| - """
|
| - dst_is_special = False
|
| - if dst_url.IsFileUrl():
|
| - # Check explicitly first because os.stat doesn't work on 'nul' in Windows.
|
| - if dst_url.object_name == os.devnull:
|
| - dst_is_special = True
|
| - try:
|
| - mode = os.stat(dst_url.object_name).st_mode
|
| - if stat.S_ISCHR(mode):
|
| - dst_is_special = True
|
| - except OSError:
|
| - pass
|
| -
|
| - if dst_is_special:
|
| - return CloudApi.DownloadStrategy.ONE_SHOT
|
| - else:
|
| - return CloudApi.DownloadStrategy.RESUMABLE
|
| -
|
| -
|
| -def _GetUploadTrackerData(tracker_file_name, logger):
|
| - """Reads tracker data from an upload tracker file if it exists.
|
| -
|
| - Args:
|
| - tracker_file_name: Tracker file name for this upload.
|
| - logger: for outputting log messages.
|
| -
|
| - Returns:
|
| - Serialization data if the tracker file already exists (resume existing
|
| - upload), None otherwise.
|
| - """
|
| - tracker_file = None
|
| -
|
| - # If we already have a matching tracker file, get the serialization data
|
| - # so that we can resume the upload.
|
| - try:
|
| - tracker_file = open(tracker_file_name, 'r')
|
| - tracker_data = tracker_file.read()
|
| - return tracker_data
|
| - except IOError as e:
|
| - # Ignore non-existent file (happens first time a upload is attempted on an
|
| - # object, or when re-starting an upload after a
|
| - # ResumableUploadStartOverException), but warn user for other errors.
|
| - if e.errno != errno.ENOENT:
|
| - logger.warn('Couldn\'t read upload tracker file (%s): %s. Restarting '
|
| - 'upload from scratch.', tracker_file_name, e.strerror)
|
| - finally:
|
| - if tracker_file:
|
| - tracker_file.close()
|
| -
|
| -
|
| -def InsistDstUrlNamesContainer(exp_dst_url, have_existing_dst_container,
|
| - command_name):
|
| - """Ensures the destination URL names a container.
|
| -
|
| - Acceptable containers include directory, bucket, bucket
|
| - subdir, and non-existent bucket subdir.
|
| -
|
| - Args:
|
| - exp_dst_url: Wildcard-expanded destination StorageUrl.
|
| - have_existing_dst_container: bool indicator of whether exp_dst_url
|
| - names a container (directory, bucket, or existing bucket subdir).
|
| - command_name: Name of command making call. May not be the same as the
|
| - calling class's self.command_name in the case of commands implemented
|
| - atop other commands (like mv command).
|
| -
|
| - Raises:
|
| - CommandException: if the URL being checked does not name a container.
|
| - """
|
| - if ((exp_dst_url.IsFileUrl() and not exp_dst_url.IsDirectory()) or
|
| - (exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket()
|
| - and not have_existing_dst_container)):
|
| - raise CommandException('Destination URL must name a directory, bucket, '
|
| - 'or bucket\nsubdirectory for the multiple '
|
| - 'source form of the %s command.' % command_name)
|
| -
|
| -
|
| -def _ShouldTreatDstUrlAsBucketSubDir(have_multiple_srcs, dst_url,
|
| - have_existing_dest_subdir,
|
| - src_url_names_container,
|
| - recursion_requested):
|
| - """Checks whether dst_url should be treated as a bucket "sub-directory".
|
| -
|
| - The decision about whether something constitutes a bucket "sub-directory"
|
| - depends on whether there are multiple sources in this request and whether
|
| - there is an existing bucket subdirectory. For example, when running the
|
| - command:
|
| - gsutil cp file gs://bucket/abc
|
| - if there's no existing gs://bucket/abc bucket subdirectory we should copy
|
| - file to the object gs://bucket/abc. In contrast, if
|
| - there's an existing gs://bucket/abc bucket subdirectory we should copy
|
| - file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc
|
| - exists, when running the command:
|
| - gsutil cp file1 file2 gs://bucket/abc
|
| - we should copy file1 to gs://bucket/abc/file1 (and similarly for file2).
|
| - Finally, for recursive copies, if the source is a container then we should
|
| - copy to a container as the target. For example, when running the command:
|
| - gsutil cp -r dir1 gs://bucket/dir2
|
| - we should copy the subtree of dir1 to gs://bucket/dir2.
|
| -
|
| - Note that we don't disallow naming a bucket "sub-directory" where there's
|
| - already an object at that URL. For example it's legitimate (albeit
|
| - confusing) to have an object called gs://bucket/dir and
|
| - then run the command
|
| - gsutil cp file1 file2 gs://bucket/dir
|
| - Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1,
|
| - and gs://bucket/dir/file2.
|
| -
|
| - Args:
|
| - have_multiple_srcs: Bool indicator of whether this is a multi-source
|
| - operation.
|
| - dst_url: StorageUrl to check.
|
| - have_existing_dest_subdir: bool indicator whether dest is an existing
|
| - subdirectory.
|
| - src_url_names_container: bool indicator of whether the source URL
|
| - is a container.
|
| - recursion_requested: True if a recursive operation has been requested.
|
| -
|
| - Returns:
|
| - bool indicator.
|
| - """
|
| - if have_existing_dest_subdir:
|
| - return True
|
| - if dst_url.IsCloudUrl():
|
| - return (have_multiple_srcs or
|
| - (src_url_names_container and recursion_requested))
|
| -
|
| -
|
| -def _ShouldTreatDstUrlAsSingleton(have_multiple_srcs,
|
| - have_existing_dest_subdir, dst_url,
|
| - recursion_requested):
|
| - """Checks that dst_url names a single file/object after wildcard expansion.
|
| -
|
| - It is possible that an object path might name a bucket sub-directory.
|
| -
|
| - Args:
|
| - have_multiple_srcs: Bool indicator of whether this is a multi-source
|
| - operation.
|
| - have_existing_dest_subdir: bool indicator whether dest is an existing
|
| - subdirectory.
|
| - dst_url: StorageUrl to check.
|
| - recursion_requested: True if a recursive operation has been requested.
|
| -
|
| - Returns:
|
| - bool indicator.
|
| - """
|
| - if recursion_requested:
|
| - return False
|
| - if dst_url.IsFileUrl():
|
| - return not dst_url.IsDirectory()
|
| - else: # dst_url.IsCloudUrl()
|
| - return (not have_multiple_srcs and
|
| - not have_existing_dest_subdir and
|
| - dst_url.IsObject())
|
| -
|
| -
|
| -def ConstructDstUrl(src_url, exp_src_url, src_url_names_container,
|
| - have_multiple_srcs, exp_dst_url, have_existing_dest_subdir,
|
| - recursion_requested):
|
| - """Constructs the destination URL for a given exp_src_url/exp_dst_url pair.
|
| -
|
| - Uses context-dependent naming rules that mimic Linux cp and mv behavior.
|
| -
|
| - Args:
|
| - src_url: Source StorageUrl to be copied.
|
| - exp_src_url: Single StorageUrl from wildcard expansion of src_url.
|
| - src_url_names_container: True if src_url names a container (including the
|
| - case of a wildcard-named bucket subdir (like gs://bucket/abc,
|
| - where gs://bucket/abc/* matched some objects).
|
| - have_multiple_srcs: True if this is a multi-source request. This can be
|
| - true if src_url wildcard-expanded to multiple URLs or if there were
|
| - multiple source URLs in the request.
|
| - exp_dst_url: the expanded StorageUrl requested for the cp destination.
|
| - Final written path is constructed from this plus a context-dependent
|
| - variant of src_url.
|
| - have_existing_dest_subdir: bool indicator whether dest is an existing
|
| - subdirectory.
|
| - recursion_requested: True if a recursive operation has been requested.
|
| -
|
| - Returns:
|
| - StorageUrl to use for copy.
|
| -
|
| - Raises:
|
| - CommandException if destination object name not specified for
|
| - source and source is a stream.
|
| - """
|
| - if _ShouldTreatDstUrlAsSingleton(
|
| - have_multiple_srcs, have_existing_dest_subdir, exp_dst_url,
|
| - recursion_requested):
|
| - # We're copying one file or object to one file or object.
|
| - return exp_dst_url
|
| -
|
| - if exp_src_url.IsFileUrl() and exp_src_url.IsStream():
|
| - if have_existing_dest_subdir:
|
| - raise CommandException('Destination object name needed when '
|
| - 'source is a stream')
|
| - return exp_dst_url
|
| -
|
| - if not recursion_requested and not have_multiple_srcs:
|
| - # We're copying one file or object to a subdirectory. Append final comp
|
| - # of exp_src_url to exp_dst_url.
|
| - src_final_comp = exp_src_url.object_name.rpartition(src_url.delim)[-1]
|
| - return StorageUrlFromString('%s%s%s' % (
|
| - exp_dst_url.url_string.rstrip(exp_dst_url.delim),
|
| - exp_dst_url.delim, src_final_comp))
|
| -
|
| - # Else we're copying multiple sources to a directory, bucket, or a bucket
|
| - # "sub-directory".
|
| -
|
| - # Ensure exp_dst_url ends in delim char if we're doing a multi-src copy or
|
| - # a copy to a directory. (The check for copying to a directory needs
|
| - # special-case handling so that the command:
|
| - # gsutil cp gs://bucket/obj dir
|
| - # will turn into file://dir/ instead of file://dir -- the latter would cause
|
| - # the file "dirobj" to be created.)
|
| - # Note: need to check have_multiple_srcs or src_url.names_container()
|
| - # because src_url could be a bucket containing a single object, named
|
| - # as gs://bucket.
|
| - if ((have_multiple_srcs or src_url_names_container or
|
| - (exp_dst_url.IsFileUrl() and exp_dst_url.IsDirectory()))
|
| - and not exp_dst_url.url_string.endswith(exp_dst_url.delim)):
|
| - exp_dst_url = StorageUrlFromString('%s%s' % (exp_dst_url.url_string,
|
| - exp_dst_url.delim))
|
| -
|
| - # Making naming behavior match how things work with local Linux cp and mv
|
| - # operations depends on many factors, including whether the destination is a
|
| - # container, the plurality of the source(s), and whether the mv command is
|
| - # being used:
|
| - # 1. For the "mv" command that specifies a non-existent destination subdir,
|
| - # renaming should occur at the level of the src subdir, vs appending that
|
| - # subdir beneath the dst subdir like is done for copying. For example:
|
| - # gsutil rm -r gs://bucket
|
| - # gsutil cp -r dir1 gs://bucket
|
| - # gsutil cp -r dir2 gs://bucket/subdir1
|
| - # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2
|
| - # would (if using cp naming behavior) end up with paths like:
|
| - # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops
|
| - # whereas mv naming behavior should result in:
|
| - # gs://bucket/subdir2/dir2/.svn/all-wcprops
|
| - # 2. Copying from directories, buckets, or bucket subdirs should result in
|
| - # objects/files mirroring the source directory hierarchy. For example:
|
| - # gsutil cp dir1/dir2 gs://bucket
|
| - # should create the object gs://bucket/dir2/file2, assuming dir1/dir2
|
| - # contains file2).
|
| - # To be consistent with Linux cp behavior, there's one more wrinkle when
|
| - # working with subdirs: The resulting object names depend on whether the
|
| - # destination subdirectory exists. For example, if gs://bucket/subdir
|
| - # exists, the command:
|
| - # gsutil cp -r dir1/dir2 gs://bucket/subdir
|
| - # should create objects named like gs://bucket/subdir/dir2/a/b/c. In
|
| - # contrast, if gs://bucket/subdir does not exist, this same command
|
| - # should create objects named like gs://bucket/subdir/a/b/c.
|
| - # 3. Copying individual files or objects to dirs, buckets or bucket subdirs
|
| - # should result in objects/files named by the final source file name
|
| - # component. Example:
|
| - # gsutil cp dir1/*.txt gs://bucket
|
| - # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt,
|
| - # assuming dir1 contains f1.txt and f2.txt.
|
| -
|
| - recursive_move_to_new_subdir = False
|
| - if (global_copy_helper_opts.perform_mv and recursion_requested
|
| - and src_url_names_container and not have_existing_dest_subdir):
|
| - # Case 1. Handle naming rules for bucket subdir mv. Here we want to
|
| - # line up the src_url against its expansion, to find the base to build
|
| - # the new name. For example, running the command:
|
| - # gsutil mv gs://bucket/abcd gs://bucket/xyz
|
| - # when processing exp_src_url=gs://bucket/abcd/123
|
| - # exp_src_url_tail should become /123
|
| - # Note: mv.py code disallows wildcard specification of source URL.
|
| - recursive_move_to_new_subdir = True
|
| - exp_src_url_tail = (
|
| - exp_src_url.url_string[len(src_url.url_string):])
|
| - dst_key_name = '%s/%s' % (exp_dst_url.object_name.rstrip('/'),
|
| - exp_src_url_tail.strip('/'))
|
| -
|
| - elif src_url_names_container and (exp_dst_url.IsCloudUrl() or
|
| - exp_dst_url.IsDirectory()):
|
| - # Case 2. Container copy to a destination other than a file.
|
| - # Build dst_key_name from subpath of exp_src_url past
|
| - # where src_url ends. For example, for src_url=gs://bucket/ and
|
| - # exp_src_url=gs://bucket/src_subdir/obj, dst_key_name should be
|
| - # src_subdir/obj.
|
| - src_url_path_sans_final_dir = GetPathBeforeFinalDir(src_url)
|
| - dst_key_name = exp_src_url.versionless_url_string[
|
| - len(src_url_path_sans_final_dir):].lstrip(src_url.delim)
|
| - # Handle case where dst_url is a non-existent subdir.
|
| - if not have_existing_dest_subdir:
|
| - dst_key_name = dst_key_name.partition(src_url.delim)[-1]
|
| - # Handle special case where src_url was a directory named with '.' or
|
| - # './', so that running a command like:
|
| - # gsutil cp -r . gs://dest
|
| - # will produce obj names of the form gs://dest/abc instead of
|
| - # gs://dest/./abc.
|
| - if dst_key_name.startswith('.%s' % os.sep):
|
| - dst_key_name = dst_key_name[2:]
|
| -
|
| - else:
|
| - # Case 3.
|
| - dst_key_name = exp_src_url.object_name.rpartition(src_url.delim)[-1]
|
| -
|
| - if (not recursive_move_to_new_subdir and (
|
| - exp_dst_url.IsFileUrl() or _ShouldTreatDstUrlAsBucketSubDir(
|
| - have_multiple_srcs, exp_dst_url, have_existing_dest_subdir,
|
| - src_url_names_container, recursion_requested))):
|
| - if exp_dst_url.object_name and exp_dst_url.object_name.endswith(
|
| - exp_dst_url.delim):
|
| - dst_key_name = '%s%s%s' % (
|
| - exp_dst_url.object_name.rstrip(exp_dst_url.delim),
|
| - exp_dst_url.delim, dst_key_name)
|
| - else:
|
| - delim = exp_dst_url.delim if exp_dst_url.object_name else ''
|
| - dst_key_name = '%s%s%s' % (exp_dst_url.object_name or '',
|
| - delim, dst_key_name)
|
| -
|
| - new_exp_dst_url = exp_dst_url.Clone()
|
| - new_exp_dst_url.object_name = dst_key_name.replace(src_url.delim,
|
| - exp_dst_url.delim)
|
| - return new_exp_dst_url
|
| -
|
| -
|
| -def _CreateDigestsFromDigesters(digesters):
|
| - digests = {}
|
| - if digesters:
|
| - for alg in digesters:
|
| - digests[alg] = base64.encodestring(
|
| - digesters[alg].digest()).rstrip('\n')
|
| - return digests
|
| -
|
| -
|
| -def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata):
|
| - """Creates a base64 CRC32C and/or MD5 digest from file_name.
|
| -
|
| - Args:
|
| - logger: for outputting log messages.
|
| - algs: list of algorithms to compute.
|
| - file_name: file to digest.
|
| - src_obj_metadata: metadta of source object.
|
| -
|
| - Returns:
|
| - Dict of algorithm name : base 64 encoded digest
|
| - """
|
| - hash_dict = {}
|
| - if 'md5' in algs:
|
| - if src_obj_metadata.size and src_obj_metadata.size > TEN_MIB:
|
| - logger.info(
|
| - 'Computing MD5 for %s...', file_name)
|
| - hash_dict['md5'] = md5()
|
| - if 'crc32c' in algs:
|
| - hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c')
|
| - with open(file_name, 'rb') as fp:
|
| - CalculateHashesFromContents(
|
| - fp, hash_dict, ProgressCallbackWithBackoff(
|
| - src_obj_metadata.size,
|
| - FileProgressCallbackHandler(
|
| - ConstructAnnounceText('Hashing', file_name), logger).call))
|
| - digests = {}
|
| - for alg_name, digest in hash_dict.iteritems():
|
| - digests[alg_name] = Base64EncodeHash(digest.hexdigest())
|
| - return digests
|
| -
|
| -
|
| -def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
|
| - dst_obj_metadata):
|
| - """Validates integrity of two cloud objects copied via daisy-chain.
|
| -
|
| - Args:
|
| - logger: for outputting log messages.
|
| - src_url: CloudUrl for source cloud object.
|
| - dst_url: CloudUrl for destination cloud object.
|
| - src_obj_metadata: Cloud Object metadata for object being downloaded from.
|
| - dst_obj_metadata: Cloud Object metadata for object being uploaded to.
|
| -
|
| - Raises:
|
| - CommandException: if cloud digests don't match local digests.
|
| - """
|
| - checked_one = False
|
| - download_hashes = {}
|
| - upload_hashes = {}
|
| - if src_obj_metadata.md5Hash:
|
| - download_hashes['md5'] = src_obj_metadata.md5Hash
|
| - if src_obj_metadata.crc32c:
|
| - download_hashes['crc32c'] = src_obj_metadata.crc32c
|
| - if dst_obj_metadata.md5Hash:
|
| - upload_hashes['md5'] = dst_obj_metadata.md5Hash
|
| - if dst_obj_metadata.crc32c:
|
| - upload_hashes['crc32c'] = dst_obj_metadata.crc32c
|
| -
|
| - for alg, upload_b64_digest in upload_hashes.iteritems():
|
| - if alg not in download_hashes:
|
| - continue
|
| -
|
| - download_b64_digest = download_hashes[alg]
|
| - logger.debug(
|
| - 'Comparing source vs destination %s-checksum for %s. (%s/%s)', alg,
|
| - dst_url, download_b64_digest, upload_b64_digest)
|
| - if download_b64_digest != upload_b64_digest:
|
| - raise HashMismatchException(
|
| - '%s signature for source object (%s) doesn\'t match '
|
| - 'destination object digest (%s). Object (%s) will be deleted.' % (
|
| - alg, download_b64_digest, upload_b64_digest, dst_url))
|
| - checked_one = True
|
| - if not checked_one:
|
| - # One known way this can currently happen is when downloading objects larger
|
| - # than 5 GiB from S3 (for which the etag is not an MD5).
|
| - logger.warn(
|
| - 'WARNING: Found no hashes to validate object downloaded from %s and '
|
| - 'uploaded to %s. Integrity cannot be assured without hashes.',
|
| - src_url, dst_url)
|
| -
|
| -
|
| -def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests,
|
| - is_upload=False):
|
| - """Validates integrity by comparing cloud digest to local digest.
|
| -
|
| - Args:
|
| - logger: for outputting log messages.
|
| - obj_url: CloudUrl for cloud object.
|
| - obj_metadata: Cloud Object being downloaded from or uploaded to.
|
| - file_name: Local file name on disk being downloaded to or uploaded from.
|
| - digests: Computed Digests for the object.
|
| - is_upload: If true, comparing for an uploaded object (controls logging).
|
| -
|
| - Raises:
|
| - CommandException: if cloud digests don't match local digests.
|
| - """
|
| - local_hashes = digests
|
| - cloud_hashes = {}
|
| - if obj_metadata.md5Hash:
|
| - cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n')
|
| - if obj_metadata.crc32c:
|
| - cloud_hashes['crc32c'] = obj_metadata.crc32c.rstrip('\n')
|
| -
|
| - checked_one = False
|
| - for alg in local_hashes:
|
| - if alg not in cloud_hashes:
|
| - continue
|
| -
|
| - local_b64_digest = local_hashes[alg]
|
| - cloud_b64_digest = cloud_hashes[alg]
|
| - logger.debug(
|
| - 'Comparing local vs cloud %s-checksum for %s. (%s/%s)', alg, file_name,
|
| - local_b64_digest, cloud_b64_digest)
|
| - if local_b64_digest != cloud_b64_digest:
|
| -
|
| - raise HashMismatchException(
|
| - '%s signature computed for local file (%s) doesn\'t match '
|
| - 'cloud-supplied digest (%s). %s (%s) will be deleted.' % (
|
| - alg, local_b64_digest, cloud_b64_digest,
|
| - 'Cloud object' if is_upload else 'Local file',
|
| - obj_url if is_upload else file_name))
|
| - checked_one = True
|
| - if not checked_one:
|
| - if is_upload:
|
| - logger.warn(
|
| - 'WARNING: Found no hashes to validate object uploaded to %s. '
|
| - 'Integrity cannot be assured without hashes.', obj_url)
|
| - else:
|
| - # One known way this can currently happen is when downloading objects larger
|
| - # than 5 GB from S3 (for which the etag is not an MD5).
|
| - logger.warn(
|
| - 'WARNING: Found no hashes to validate object downloaded to %s. '
|
| - 'Integrity cannot be assured without hashes.', file_name)
|
| -
|
| -
|
| -def IsNoClobberServerException(e):
|
| - """Checks to see if the server attempted to clobber a file.
|
| -
|
| - In this case we specified via a precondition that we didn't want the file
|
| - clobbered.
|
| -
|
| - Args:
|
| - e: The Exception that was generated by a failed copy operation
|
| -
|
| - Returns:
|
| - bool indicator - True indicates that the server did attempt to clobber
|
| - an existing file.
|
| - """
|
| - return ((isinstance(e, PreconditionException)) or
|
| - (isinstance(e, ResumableUploadException) and '412' in e.message))
|
| -
|
| -
|
| -def CheckForDirFileConflict(exp_src_url, dst_url):
|
| - """Checks whether copying exp_src_url into dst_url is not possible.
|
| -
|
| - This happens if a directory exists in local file system where a file
|
| - needs to go or vice versa. In that case we print an error message and
|
| - exits. Example: if the file "./x" exists and you try to do:
|
| - gsutil cp gs://mybucket/x/y .
|
| - the request can't succeed because it requires a directory where
|
| - the file x exists.
|
| -
|
| - Note that we don't enforce any corresponding restrictions for buckets,
|
| - because the flat namespace semantics for buckets doesn't prohibit such
|
| - cases the way hierarchical file systems do. For example, if a bucket
|
| - contains an object called gs://bucket/dir and then you run the command:
|
| - gsutil cp file1 file2 gs://bucket/dir
|
| - you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and
|
| - gs://bucket/dir/file2.
|
| -
|
| - Args:
|
| - exp_src_url: Expanded source StorageUrl.
|
| - dst_url: Destination StorageUrl.
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - if dst_url.IsCloudUrl():
|
| - # The problem can only happen for file destination URLs.
|
| - return
|
| - dst_path = dst_url.object_name
|
| - final_dir = os.path.dirname(dst_path)
|
| - if os.path.isfile(final_dir):
|
| - raise CommandException('Cannot retrieve %s because a file exists '
|
| - 'where a directory needs to be created (%s).' %
|
| - (exp_src_url.url_string, final_dir))
|
| - if os.path.isdir(dst_path):
|
| - raise CommandException('Cannot retrieve %s because a directory exists '
|
| - '(%s) where the file needs to be created.' %
|
| - (exp_src_url.url_string, dst_path))
|
| -
|
| -
|
| -def _PartitionFile(fp, file_size, src_url, content_type, canned_acl,
|
| - dst_bucket_url, random_prefix, tracker_file,
|
| - tracker_file_lock):
|
| - """Partitions a file into FilePart objects to be uploaded and later composed.
|
| -
|
| - These objects, when composed, will match the original file. This entails
|
| - splitting the file into parts, naming and forming a destination URL for each
|
| - part, and also providing the PerformParallelUploadFileToObjectArgs
|
| - corresponding to each part.
|
| -
|
| - Args:
|
| - fp: The file object to be partitioned.
|
| - file_size: The size of fp, in bytes.
|
| - src_url: Source FileUrl from the original command.
|
| - content_type: content type for the component and final objects.
|
| - canned_acl: The user-provided canned_acl, if applicable.
|
| - dst_bucket_url: CloudUrl for the destination bucket
|
| - random_prefix: The randomly-generated prefix used to prevent collisions
|
| - among the temporary component names.
|
| - tracker_file: The path to the parallel composite upload tracker file.
|
| - tracker_file_lock: The lock protecting access to the tracker file.
|
| -
|
| - Returns:
|
| - dst_args: The destination URIs for the temporary component objects.
|
| - """
|
| - parallel_composite_upload_component_size = HumanReadableToBytes(
|
| - config.get('GSUtil', 'parallel_composite_upload_component_size',
|
| - DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE))
|
| - (num_components, component_size) = _GetPartitionInfo(
|
| - file_size, MAX_COMPOSE_ARITY, parallel_composite_upload_component_size)
|
| -
|
| - dst_args = {} # Arguments to create commands and pass to subprocesses.
|
| - file_names = [] # Used for the 2-step process of forming dst_args.
|
| - for i in range(num_components):
|
| - # "Salt" the object name with something a user is very unlikely to have
|
| - # used in an object name, then hash the extended name to make sure
|
| - # we don't run into problems with name length. Using a deterministic
|
| - # naming scheme for the temporary components allows users to take
|
| - # advantage of resumable uploads for each component.
|
| - encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode(UTF8)
|
| - content_md5 = md5()
|
| - content_md5.update(encoded_name)
|
| - digest = content_md5.hexdigest()
|
| - temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE +
|
| - digest + '_' + str(i))
|
| - tmp_dst_url = dst_bucket_url.Clone()
|
| - tmp_dst_url.object_name = temp_file_name
|
| -
|
| - if i < (num_components - 1):
|
| - # Every component except possibly the last is the same size.
|
| - file_part_length = component_size
|
| - else:
|
| - # The last component just gets all of the remaining bytes.
|
| - file_part_length = (file_size - ((num_components -1) * component_size))
|
| - offset = i * component_size
|
| - func_args = PerformParallelUploadFileToObjectArgs(
|
| - fp.name, offset, file_part_length, src_url, tmp_dst_url, canned_acl,
|
| - content_type, tracker_file, tracker_file_lock)
|
| - file_names.append(temp_file_name)
|
| - dst_args[temp_file_name] = func_args
|
| -
|
| - return dst_args
|
| -
|
| -
|
| -def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata,
|
| - canned_acl, file_size, preconditions, gsutil_api,
|
| - command_obj, copy_exception_handler):
|
| - """Uploads a local file to a cloud object using parallel composite upload.
|
| -
|
| - The file is partitioned into parts, and then the parts are uploaded in
|
| - parallel, composed to form the original destination object, and deleted.
|
| -
|
| - Args:
|
| - fp: The file object to be uploaded.
|
| - src_url: FileUrl representing the local file.
|
| - dst_url: CloudUrl representing the destination file.
|
| - dst_obj_metadata: apitools Object describing the destination object.
|
| - canned_acl: The canned acl to apply to the object, if any.
|
| - file_size: The size of the source file in bytes.
|
| - preconditions: Cloud API Preconditions for the final object.
|
| - gsutil_api: gsutil Cloud API instance to use.
|
| - command_obj: Command object (for calling Apply).
|
| - copy_exception_handler: Copy exception handler (for use in Apply).
|
| -
|
| - Returns:
|
| - Elapsed upload time, uploaded Object with generation, crc32c, and size
|
| - fields populated.
|
| - """
|
| - start_time = time.time()
|
| - dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string)
|
| - api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme)
|
| - # Determine which components, if any, have already been successfully
|
| - # uploaded.
|
| - tracker_file = GetTrackerFilePath(dst_url, TrackerFileType.PARALLEL_UPLOAD,
|
| - api_selector, src_url)
|
| - tracker_file_lock = CreateLock()
|
| - (random_prefix, existing_components) = (
|
| - _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock))
|
| -
|
| - # Create the initial tracker file for the upload.
|
| - _CreateParallelUploadTrackerFile(tracker_file, random_prefix,
|
| - existing_components, tracker_file_lock)
|
| -
|
| - # Get the set of all components that should be uploaded.
|
| - dst_args = _PartitionFile(
|
| - fp, file_size, src_url, dst_obj_metadata.contentType, canned_acl,
|
| - dst_bucket_url, random_prefix, tracker_file, tracker_file_lock)
|
| -
|
| - (components_to_upload, existing_components, existing_objects_to_delete) = (
|
| - FilterExistingComponents(dst_args, existing_components, dst_bucket_url,
|
| - gsutil_api))
|
| -
|
| - # In parallel, copy all of the file parts that haven't already been
|
| - # uploaded to temporary objects.
|
| - cp_results = command_obj.Apply(
|
| - _PerformParallelUploadFileToObject, components_to_upload,
|
| - copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'),
|
| - arg_checker=gslib.command.DummyArgChecker,
|
| - parallel_operations_override=True, should_return_results=True)
|
| - uploaded_components = []
|
| - for cp_result in cp_results:
|
| - uploaded_components.append(cp_result[2])
|
| - components = uploaded_components + existing_components
|
| -
|
| - if len(components) == len(dst_args):
|
| - # Only try to compose if all of the components were uploaded successfully.
|
| -
|
| - def _GetComponentNumber(component):
|
| - return int(component.object_name[component.object_name.rfind('_')+1:])
|
| - # Sort the components so that they will be composed in the correct order.
|
| - components = sorted(components, key=_GetComponentNumber)
|
| -
|
| - request_components = []
|
| - for component_url in components:
|
| - src_obj_metadata = (
|
| - apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
|
| - name=component_url.object_name))
|
| - if component_url.HasGeneration():
|
| - src_obj_metadata.generation = long(component_url.generation)
|
| - request_components.append(src_obj_metadata)
|
| -
|
| - composed_object = gsutil_api.ComposeObject(
|
| - request_components, dst_obj_metadata, preconditions=preconditions,
|
| - provider=dst_url.scheme, fields=['generation', 'crc32c', 'size'])
|
| -
|
| - try:
|
| - # Make sure only to delete things that we know were successfully
|
| - # uploaded (as opposed to all of the objects that we attempted to
|
| - # create) so that we don't delete any preexisting objects, except for
|
| - # those that were uploaded by a previous, failed run and have since
|
| - # changed (but still have an old generation lying around).
|
| - objects_to_delete = components + existing_objects_to_delete
|
| - command_obj.Apply(_DeleteObjectFn, objects_to_delete, _RmExceptionHandler,
|
| - arg_checker=gslib.command.DummyArgChecker,
|
| - parallel_operations_override=True)
|
| - except Exception: # pylint: disable=broad-except
|
| - # If some of the delete calls fail, don't cause the whole command to
|
| - # fail. The copy was successful iff the compose call succeeded, so
|
| - # reduce this to a warning.
|
| - logging.warning(
|
| - 'Failed to delete some of the following temporary objects:\n' +
|
| - '\n'.join(dst_args.keys()))
|
| - finally:
|
| - with tracker_file_lock:
|
| - if os.path.exists(tracker_file):
|
| - os.unlink(tracker_file)
|
| - else:
|
| - # Some of the components failed to upload. In this case, we want to exit
|
| - # without deleting the objects.
|
| - raise CommandException(
|
| - 'Some temporary components were not uploaded successfully. '
|
| - 'Please retry this upload.')
|
| -
|
| - elapsed_time = time.time() - start_time
|
| - return elapsed_time, composed_object
|
| -
|
| -
|
| -def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url,
|
| - file_size, canned_acl=None):
|
| - """Determines whether parallel composite upload strategy should be used.
|
| -
|
| - Args:
|
| - logger: for outputting log messages.
|
| - allow_splitting: If false, then this function returns false.
|
| - src_url: FileUrl corresponding to a local file.
|
| - dst_url: CloudUrl corresponding to destination cloud object.
|
| - file_size: The size of the source file, in bytes.
|
| - canned_acl: Canned ACL to apply to destination object, if any.
|
| -
|
| - Returns:
|
| - True iff a parallel upload should be performed on the source file.
|
| - """
|
| - global suggested_parallel_composites
|
| - parallel_composite_upload_threshold = HumanReadableToBytes(config.get(
|
| - 'GSUtil', 'parallel_composite_upload_threshold',
|
| - DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD))
|
| -
|
| - all_factors_but_size = (
|
| - allow_splitting # Don't split the pieces multiple times.
|
| - and not src_url.IsStream() # We can't partition streams.
|
| - and dst_url.scheme == 'gs' # Compose is only for gs.
|
| - and not canned_acl) # TODO: Implement canned ACL support for compose.
|
| -
|
| - # Since parallel composite uploads are disabled by default, make user aware of
|
| - # them.
|
| - # TODO: Once compiled crcmod is being distributed by major Linux distributions
|
| - # remove this check.
|
| - if (all_factors_but_size and parallel_composite_upload_threshold == 0
|
| - and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD
|
| - and not suggested_parallel_composites):
|
| - logger.info('\n'.join(textwrap.wrap(
|
| - '==> NOTE: You are uploading one or more large file(s), which would '
|
| - 'run significantly faster if you enable parallel composite uploads. '
|
| - 'This feature can be enabled by editing the '
|
| - '"parallel_composite_upload_threshold" value in your .boto '
|
| - 'configuration file. However, note that if you do this you and any '
|
| - 'users that download such composite files will need to have a compiled '
|
| - 'crcmod installed (see "gsutil help crcmod").')) + '\n')
|
| - suggested_parallel_composites = True
|
| -
|
| - return (all_factors_but_size
|
| - and parallel_composite_upload_threshold > 0
|
| - and file_size >= parallel_composite_upload_threshold)
|
| -
|
| -
|
| -def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id,
|
| - treat_nonexistent_object_as_subdir=False):
|
| - """Expands wildcard if present in url_str.
|
| -
|
| - Args:
|
| - url_str: String representation of requested url.
|
| - gsutil_api: gsutil Cloud API instance to use.
|
| - debug: debug level to use (for iterators).
|
| - project_id: project ID to use (for iterators).
|
| - treat_nonexistent_object_as_subdir: indicates if should treat a non-existent
|
| - object as a subdir.
|
| -
|
| - Returns:
|
| - (exp_url, have_existing_dst_container)
|
| - where exp_url is a StorageUrl
|
| - and have_existing_dst_container is a bool indicating whether
|
| - exp_url names an existing directory, bucket, or bucket subdirectory.
|
| - In the case where we match a subdirectory AND an object, the
|
| - object is returned.
|
| -
|
| - Raises:
|
| - CommandException: if url_str matched more than 1 URL.
|
| - """
|
| - # Handle wildcarded url case.
|
| - if ContainsWildcard(url_str):
|
| - blr_expansion = list(CreateWildcardIterator(url_str, gsutil_api,
|
| - debug=debug,
|
| - project_id=project_id))
|
| - if len(blr_expansion) != 1:
|
| - raise CommandException('Destination (%s) must match exactly 1 URL' %
|
| - url_str)
|
| - blr = blr_expansion[0]
|
| - # BLR is either an OBJECT, PREFIX, or BUCKET; the latter two represent
|
| - # directories.
|
| - return (StorageUrlFromString(blr.url_string), not blr.IsObject())
|
| -
|
| - storage_url = StorageUrlFromString(url_str)
|
| -
|
| - # Handle non-wildcarded URL.
|
| - if storage_url.IsFileUrl():
|
| - return (storage_url, storage_url.IsDirectory())
|
| -
|
| - # At this point we have a cloud URL.
|
| - if storage_url.IsBucket():
|
| - return (storage_url, True)
|
| -
|
| - # For object/prefix URLs check 3 cases: (a) if the name ends with '/' treat
|
| - # as a subdir; otherwise, use the wildcard iterator with url to
|
| - # find if (b) there's a Prefix matching url, or (c) name is of form
|
| - # dir_$folder$ (and in both these cases also treat dir as a subdir).
|
| - # Cloud subdirs are always considered to be an existing container.
|
| - if IsCloudSubdirPlaceholder(storage_url):
|
| - return (storage_url, True)
|
| -
|
| - # Check for the special case where we have a folder marker object.
|
| - folder_expansion = CreateWildcardIterator(
|
| - storage_url.versionless_url_string + '_$folder$', gsutil_api,
|
| - debug=debug, project_id=project_id).IterAll(
|
| - bucket_listing_fields=['name'])
|
| - for blr in folder_expansion:
|
| - return (storage_url, True)
|
| -
|
| - blr_expansion = CreateWildcardIterator(url_str, gsutil_api,
|
| - debug=debug,
|
| - project_id=project_id).IterAll(
|
| - bucket_listing_fields=['name'])
|
| - expansion_empty = True
|
| - for blr in blr_expansion:
|
| - expansion_empty = False
|
| - if blr.IsPrefix():
|
| - return (storage_url, True)
|
| -
|
| - return (storage_url,
|
| - expansion_empty and treat_nonexistent_object_as_subdir)
|
| -
|
| -
|
| -def FixWindowsNaming(src_url, dst_url):
|
| - """Translates Windows pathnames to cloud pathnames.
|
| -
|
| - Rewrites the destination URL built by ConstructDstUrl().
|
| -
|
| - Args:
|
| - src_url: Source StorageUrl to be copied.
|
| - dst_url: The destination StorageUrl built by ConstructDstUrl().
|
| -
|
| - Returns:
|
| - StorageUrl to use for copy.
|
| - """
|
| - if (src_url.IsFileUrl() and src_url.delim == '\\'
|
| - and dst_url.IsCloudUrl()):
|
| - trans_url_str = re.sub(r'\\', '/', dst_url.url_string)
|
| - dst_url = StorageUrlFromString(trans_url_str)
|
| - return dst_url
|
| -
|
| -
|
| -def SrcDstSame(src_url, dst_url):
|
| - """Checks if src_url and dst_url represent the same object or file.
|
| -
|
| - We don't handle anything about hard or symbolic links.
|
| -
|
| - Args:
|
| - src_url: Source StorageUrl.
|
| - dst_url: Destination StorageUrl.
|
| -
|
| - Returns:
|
| - Bool indicator.
|
| - """
|
| - if src_url.IsFileUrl() and dst_url.IsFileUrl():
|
| - # Translate a/b/./c to a/b/c, so src=dst comparison below works.
|
| - new_src_path = os.path.normpath(src_url.object_name)
|
| - new_dst_path = os.path.normpath(dst_url.object_name)
|
| - return new_src_path == new_dst_path
|
| - else:
|
| - return (src_url.url_string == dst_url.url_string and
|
| - src_url.generation == dst_url.generation)
|
| -
|
| -
|
| -def _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata):
|
| - """Logs copy operation, including Content-Type if appropriate.
|
| -
|
| - Args:
|
| - logger: logger instance to use for output.
|
| - src_url: Source StorageUrl.
|
| - dst_url: Destination StorageUrl.
|
| - dst_obj_metadata: Object-specific metadata that should be overidden during
|
| - the copy.
|
| - """
|
| - if (dst_url.IsCloudUrl() and dst_obj_metadata and
|
| - dst_obj_metadata.contentType):
|
| - content_type_msg = ' [Content-Type=%s]' % dst_obj_metadata.contentType
|
| - else:
|
| - content_type_msg = ''
|
| - if src_url.IsFileUrl() and src_url.IsStream():
|
| - logger.info('Copying from <STDIN>%s...', content_type_msg)
|
| - else:
|
| - logger.info('Copying %s%s...', src_url.url_string, content_type_msg)
|
| -
|
| -
|
| -# pylint: disable=undefined-variable
|
| -def _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
|
| - dst_obj_metadata, preconditions, gsutil_api,
|
| - logger):
|
| - """Performs copy-in-the cloud from specified src to dest object.
|
| -
|
| - Args:
|
| - src_url: Source CloudUrl.
|
| - src_obj_metadata: Metadata for source object; must include etag and size.
|
| - dst_url: Destination CloudUrl.
|
| - dst_obj_metadata: Object-specific metadata that should be overidden during
|
| - the copy.
|
| - preconditions: Preconditions to use for the copy.
|
| - gsutil_api: gsutil Cloud API instance to use for the copy.
|
| - logger: logging.Logger for log message output.
|
| -
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, dst_url with generation,
|
| - md5 hash of destination) excluding overhead like initial GET.
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - start_time = time.time()
|
| -
|
| - progress_callback = FileProgressCallbackHandler(
|
| - ConstructAnnounceText('Copying', dst_url.url_string), logger).call
|
| - if global_copy_helper_opts.test_callback_file:
|
| - with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
|
| - progress_callback = pickle.loads(test_fp.read()).call
|
| - dst_obj = gsutil_api.CopyObject(
|
| - src_obj_metadata, dst_obj_metadata, src_generation=src_url.generation,
|
| - canned_acl=global_copy_helper_opts.canned_acl,
|
| - preconditions=preconditions, progress_callback=progress_callback,
|
| - provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
|
| -
|
| - end_time = time.time()
|
| -
|
| - result_url = dst_url.Clone()
|
| - result_url.generation = GenerationFromUrlAndString(result_url,
|
| - dst_obj.generation)
|
| -
|
| - return (end_time - start_time, src_obj_metadata.size, result_url,
|
| - dst_obj.md5Hash)
|
| -
|
| -
|
| -def _CheckFreeSpace(path):
|
| - """Return path/drive free space (in bytes)."""
|
| - if IS_WINDOWS:
|
| - # pylint: disable=g-import-not-at-top
|
| - try:
|
| - # pylint: disable=invalid-name
|
| - get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p,
|
| - POINTER(c_uint64),
|
| - POINTER(c_uint64),
|
| - POINTER(c_uint64))
|
| - get_disk_free_space_ex = get_disk_free_space_ex(
|
| - ('GetDiskFreeSpaceExW', windll.kernel32), (
|
| - (1, 'lpszPathName'),
|
| - (2, 'lpFreeUserSpace'),
|
| - (2, 'lpTotalSpace'),
|
| - (2, 'lpFreeSpace'),))
|
| - except AttributeError:
|
| - get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p,
|
| - POINTER(c_uint64),
|
| - POINTER(c_uint64),
|
| - POINTER(c_uint64))
|
| - get_disk_free_space_ex = get_disk_free_space_ex(
|
| - ('GetDiskFreeSpaceExA', windll.kernel32), (
|
| - (1, 'lpszPathName'),
|
| - (2, 'lpFreeUserSpace'),
|
| - (2, 'lpTotalSpace'),
|
| - (2, 'lpFreeSpace'),))
|
| -
|
| - def GetDiskFreeSpaceExErrCheck(result, unused_func, args):
|
| - if not result:
|
| - raise WinError()
|
| - return args[1].value
|
| - get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck
|
| -
|
| - return get_disk_free_space_ex(os.getenv('SystemDrive'))
|
| - else:
|
| - (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path)
|
| - return f_frsize * f_bavail
|
| -
|
| -
|
| -def _SetContentTypeFromFile(src_url, dst_obj_metadata):
|
| - """Detects and sets Content-Type if src_url names a local file.
|
| -
|
| - Args:
|
| - src_url: Source StorageUrl.
|
| - dst_obj_metadata: Object-specific metadata that should be overidden during
|
| - the copy.
|
| - """
|
| - # contentType == '' if user requested default type.
|
| - if (dst_obj_metadata.contentType is None and src_url.IsFileUrl()
|
| - and not src_url.IsStream()):
|
| - # Only do content type recognition if src_url is a file. Object-to-object
|
| - # copies with no -h Content-Type specified re-use the content type of the
|
| - # source object.
|
| - object_name = src_url.object_name
|
| - content_type = None
|
| - # Streams (denoted by '-') are expected to be 'application/octet-stream'
|
| - # and 'file' would partially consume them.
|
| - if object_name != '-':
|
| - if config.getbool('GSUtil', 'use_magicfile', False):
|
| - p = subprocess.Popen(['file', '--mime-type', object_name],
|
| - stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
| - output, error = p.communicate()
|
| - p.stdout.close()
|
| - p.stderr.close()
|
| - if p.returncode != 0 or error:
|
| - raise CommandException(
|
| - 'Encountered error running "file --mime-type %s" '
|
| - '(returncode=%d).\n%s' % (object_name, p.returncode, error))
|
| - # Parse output by removing line delimiter and splitting on last ":
|
| - content_type = output.rstrip().rpartition(': ')[2]
|
| - else:
|
| - content_type = mimetypes.guess_type(object_name)[0]
|
| - if not content_type:
|
| - content_type = DEFAULT_CONTENT_TYPE
|
| - dst_obj_metadata.contentType = content_type
|
| -
|
| -
|
| -# pylint: disable=undefined-variable
|
| -def _UploadFileToObjectNonResumable(src_url, src_obj_filestream,
|
| - src_obj_size, dst_url, dst_obj_metadata,
|
| - preconditions, gsutil_api, logger):
|
| - """Uploads the file using a non-resumable strategy.
|
| -
|
| - Args:
|
| - src_url: Source StorageUrl to upload.
|
| - src_obj_filestream: File pointer to uploadable bytes.
|
| - src_obj_size: Size of the source object.
|
| - dst_url: Destination StorageUrl for the upload.
|
| - dst_obj_metadata: Metadata for the target object.
|
| - preconditions: Preconditions for the upload, if any.
|
| - gsutil_api: gsutil Cloud API instance to use for the upload.
|
| - logger: For outputting log messages.
|
| -
|
| - Returns:
|
| - Elapsed upload time, uploaded Object with generation, md5, and size fields
|
| - populated.
|
| - """
|
| - progress_callback = FileProgressCallbackHandler(
|
| - ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
|
| - if global_copy_helper_opts.test_callback_file:
|
| - with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
|
| - progress_callback = pickle.loads(test_fp.read()).call
|
| - start_time = time.time()
|
| -
|
| - if src_url.IsStream():
|
| - # TODO: gsutil-beta: Provide progress callbacks for streaming uploads.
|
| - uploaded_object = gsutil_api.UploadObjectStreaming(
|
| - src_obj_filestream, object_metadata=dst_obj_metadata,
|
| - canned_acl=global_copy_helper_opts.canned_acl,
|
| - preconditions=preconditions, progress_callback=progress_callback,
|
| - provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
|
| - else:
|
| - uploaded_object = gsutil_api.UploadObject(
|
| - src_obj_filestream, object_metadata=dst_obj_metadata,
|
| - canned_acl=global_copy_helper_opts.canned_acl, size=src_obj_size,
|
| - preconditions=preconditions, progress_callback=progress_callback,
|
| - provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
|
| - end_time = time.time()
|
| - elapsed_time = end_time - start_time
|
| -
|
| - return elapsed_time, uploaded_object
|
| -
|
| -
|
| -# pylint: disable=undefined-variable
|
| -def _UploadFileToObjectResumable(src_url, src_obj_filestream,
|
| - src_obj_size, dst_url, dst_obj_metadata,
|
| - preconditions, gsutil_api, logger):
|
| - """Uploads the file using a resumable strategy.
|
| -
|
| - Args:
|
| - src_url: Source FileUrl to upload. Must not be a stream.
|
| - src_obj_filestream: File pointer to uploadable bytes.
|
| - src_obj_size: Size of the source object.
|
| - dst_url: Destination StorageUrl for the upload.
|
| - dst_obj_metadata: Metadata for the target object.
|
| - preconditions: Preconditions for the upload, if any.
|
| - gsutil_api: gsutil Cloud API instance to use for the upload.
|
| - logger: for outputting log messages.
|
| -
|
| - Returns:
|
| - Elapsed upload time, uploaded Object with generation, md5, and size fields
|
| - populated.
|
| - """
|
| - tracker_file_name = GetTrackerFilePath(
|
| - dst_url, TrackerFileType.UPLOAD,
|
| - gsutil_api.GetApiSelector(provider=dst_url.scheme))
|
| -
|
| - def _UploadTrackerCallback(serialization_data):
|
| - """Creates a new tracker file for starting an upload from scratch.
|
| -
|
| - This function is called by the gsutil Cloud API implementation and the
|
| - the serialization data is implementation-specific.
|
| -
|
| - Args:
|
| - serialization_data: Serialization data used in resuming the upload.
|
| - """
|
| - tracker_file = None
|
| - try:
|
| - tracker_file = open(tracker_file_name, 'w')
|
| - tracker_file.write(str(serialization_data))
|
| - except IOError as e:
|
| - RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)
|
| - finally:
|
| - if tracker_file:
|
| - tracker_file.close()
|
| -
|
| - # This contains the upload URL, which will uniquely identify the
|
| - # destination object.
|
| - tracker_data = _GetUploadTrackerData(tracker_file_name, logger)
|
| - if tracker_data:
|
| - logger.info(
|
| - 'Resuming upload for %s', src_url.url_string)
|
| -
|
| - retryable = True
|
| -
|
| - progress_callback = FileProgressCallbackHandler(
|
| - ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
|
| - if global_copy_helper_opts.test_callback_file:
|
| - with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
|
| - progress_callback = pickle.loads(test_fp.read()).call
|
| -
|
| - start_time = time.time()
|
| - num_startover_attempts = 0
|
| - # This loop causes us to retry when the resumable upload failed in a way that
|
| - # requires starting over with a new upload ID. Retries within a single upload
|
| - # ID within the current process are handled in
|
| - # gsutil_api.UploadObjectResumable, and retries within a single upload ID
|
| - # spanning processes happens if an exception occurs not caught below (which
|
| - # will leave the tracker file in place, and cause the upload ID to be reused
|
| - # the next time the user runs gsutil and attempts the same upload).
|
| - while retryable:
|
| - try:
|
| - uploaded_object = gsutil_api.UploadObjectResumable(
|
| - src_obj_filestream, object_metadata=dst_obj_metadata,
|
| - canned_acl=global_copy_helper_opts.canned_acl,
|
| - preconditions=preconditions, provider=dst_url.scheme,
|
| - size=src_obj_size, serialization_data=tracker_data,
|
| - fields=UPLOAD_RETURN_FIELDS,
|
| - tracker_callback=_UploadTrackerCallback,
|
| - progress_callback=progress_callback)
|
| - retryable = False
|
| - except ResumableUploadStartOverException, e:
|
| - # This can happen, for example, if the server sends a 410 response code.
|
| - # In that case the current resumable upload ID can't be reused, so delete
|
| - # the tracker file and try again up to max retries.
|
| - num_startover_attempts += 1
|
| - retryable = (num_startover_attempts < GetNumRetries())
|
| - if not retryable:
|
| - raise
|
| -
|
| - # If the server sends a 404 response code, then the upload should only
|
| - # be restarted if it was the object (and not the bucket) that was missing.
|
| - try:
|
| - gsutil_api.GetBucket(dst_obj_metadata.bucket, provider=dst_url.scheme)
|
| - except NotFoundException:
|
| - raise
|
| -
|
| - logger.info('Restarting upload from scratch after exception %s', e)
|
| - DeleteTrackerFile(tracker_file_name)
|
| - tracker_data = None
|
| - src_obj_filestream.seek(0)
|
| - # Reset the progress callback handler.
|
| - progress_callback = FileProgressCallbackHandler(
|
| - ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
|
| - logger.info('\n'.join(textwrap.wrap(
|
| - 'Resumable upload of %s failed with a response code indicating we '
|
| - 'need to start over with a new resumable upload ID. Backing off '
|
| - 'and retrying.' % src_url.url_string)))
|
| - time.sleep(min(random.random() * (2 ** num_startover_attempts),
|
| - GetMaxRetryDelay()))
|
| - except ResumableUploadAbortException:
|
| - retryable = False
|
| - raise
|
| - finally:
|
| - if not retryable:
|
| - DeleteTrackerFile(tracker_file_name)
|
| -
|
| - end_time = time.time()
|
| - elapsed_time = end_time - start_time
|
| -
|
| - return (elapsed_time, uploaded_object)
|
| -
|
| -
|
| -def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger):
|
| - """Compresses a to-be-uploaded local file to save bandwidth.
|
| -
|
| - Args:
|
| - src_url: Source FileUrl.
|
| - src_obj_filestream: Read stream of the source file - will be consumed
|
| - and closed.
|
| - src_obj_size: Size of the source file.
|
| - logger: for outputting log messages.
|
| -
|
| - Returns:
|
| - StorageUrl path to compressed file, compressed file size.
|
| - """
|
| - # TODO: Compress using a streaming model as opposed to all at once here.
|
| - if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING:
|
| - logger.info(
|
| - 'Compressing %s (to tmp)...', src_url)
|
| - (gzip_fh, gzip_path) = tempfile.mkstemp()
|
| - gzip_fp = None
|
| - try:
|
| - # Check for temp space. Assume the compressed object is at most 2x
|
| - # the size of the object (normally should compress to smaller than
|
| - # the object)
|
| - if _CheckFreeSpace(gzip_path) < 2*int(src_obj_size):
|
| - raise CommandException('Inadequate temp space available to compress '
|
| - '%s. See the CHANGING TEMP DIRECTORIES section '
|
| - 'of "gsutil help cp" for more info.' % src_url)
|
| - gzip_fp = gzip.open(gzip_path, 'wb')
|
| - data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
|
| - while data:
|
| - gzip_fp.write(data)
|
| - data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
|
| - finally:
|
| - if gzip_fp:
|
| - gzip_fp.close()
|
| - os.close(gzip_fh)
|
| - src_obj_filestream.close()
|
| - gzip_size = os.path.getsize(gzip_path)
|
| - return StorageUrlFromString(gzip_path), gzip_size
|
| -
|
| -
|
| -def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size,
|
| - dst_url, dst_obj_metadata, preconditions, gsutil_api,
|
| - logger, command_obj, copy_exception_handler,
|
| - gzip_exts=None, allow_splitting=True):
|
| - """Uploads a local file to an object.
|
| -
|
| - Args:
|
| - src_url: Source FileUrl.
|
| - src_obj_filestream: Read stream of the source file to be read and closed.
|
| - src_obj_size: Size of the source file.
|
| - dst_url: Destination CloudUrl.
|
| - dst_obj_metadata: Metadata to be applied to the destination object.
|
| - preconditions: Preconditions to use for the copy.
|
| - gsutil_api: gsutil Cloud API to use for the copy.
|
| - logger: for outputting log messages.
|
| - command_obj: command object for use in Apply in parallel composite uploads.
|
| - copy_exception_handler: For handling copy exceptions during Apply.
|
| - gzip_exts: List of file extensions to gzip prior to upload, if any.
|
| - allow_splitting: Whether to allow the file to be split into component
|
| - pieces for an parallel composite upload.
|
| -
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, dst_url with generation,
|
| - md5 hash of destination) excluding overhead like initial GET.
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - if not dst_obj_metadata or not dst_obj_metadata.contentLanguage:
|
| - content_language = config.get_value('GSUtil', 'content_language')
|
| - if content_language:
|
| - dst_obj_metadata.contentLanguage = content_language
|
| -
|
| - fname_parts = src_url.object_name.split('.')
|
| - upload_url = src_url
|
| - upload_stream = src_obj_filestream
|
| - upload_size = src_obj_size
|
| - zipped_file = False
|
| - if gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts:
|
| - upload_url, upload_size = _CompressFileForUpload(
|
| - src_url, src_obj_filestream, src_obj_size, logger)
|
| - upload_stream = open(upload_url.object_name, 'rb')
|
| - dst_obj_metadata.contentEncoding = 'gzip'
|
| - zipped_file = True
|
| -
|
| - elapsed_time = None
|
| - uploaded_object = None
|
| - hash_algs = GetUploadHashAlgs()
|
| - digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
|
| -
|
| - parallel_composite_upload = _ShouldDoParallelCompositeUpload(
|
| - logger, allow_splitting, upload_url, dst_url, src_obj_size,
|
| - canned_acl=global_copy_helper_opts.canned_acl)
|
| -
|
| - if (src_url.IsStream() and
|
| - gsutil_api.GetApiSelector(provider=dst_url.scheme) == ApiSelector.JSON):
|
| - orig_stream = upload_stream
|
| - # Add limited seekable properties to the stream via buffering.
|
| - upload_stream = ResumableStreamingJsonUploadWrapper(
|
| - orig_stream, GetJsonResumableChunkSize())
|
| -
|
| - if not parallel_composite_upload and len(hash_algs):
|
| - # Parallel composite uploads calculate hashes per-component in subsequent
|
| - # calls to this function, but the composition of the final object is a
|
| - # cloud-only operation.
|
| - wrapped_filestream = HashingFileUploadWrapper(upload_stream, digesters,
|
| - hash_algs, upload_url, logger)
|
| - else:
|
| - wrapped_filestream = upload_stream
|
| -
|
| - try:
|
| - if parallel_composite_upload:
|
| - elapsed_time, uploaded_object = _DoParallelCompositeUpload(
|
| - upload_stream, upload_url, dst_url, dst_obj_metadata,
|
| - global_copy_helper_opts.canned_acl, upload_size, preconditions,
|
| - gsutil_api, command_obj, copy_exception_handler)
|
| - elif upload_size < ResumableThreshold() or src_url.IsStream():
|
| - elapsed_time, uploaded_object = _UploadFileToObjectNonResumable(
|
| - upload_url, wrapped_filestream, upload_size, dst_url,
|
| - dst_obj_metadata, preconditions, gsutil_api, logger)
|
| - else:
|
| - elapsed_time, uploaded_object = _UploadFileToObjectResumable(
|
| - upload_url, wrapped_filestream, upload_size, dst_url,
|
| - dst_obj_metadata, preconditions, gsutil_api, logger)
|
| -
|
| - finally:
|
| - if zipped_file:
|
| - try:
|
| - os.unlink(upload_url.object_name)
|
| - # Windows sometimes complains the temp file is locked when you try to
|
| - # delete it.
|
| - except Exception: # pylint: disable=broad-except
|
| - logger.warning(
|
| - 'Could not delete %s. This can occur in Windows because the '
|
| - 'temporary file is still locked.', upload_url.object_name)
|
| - # In the gzip case, this is the gzip stream. _CompressFileForUpload will
|
| - # have already closed the original source stream.
|
| - upload_stream.close()
|
| -
|
| - if not parallel_composite_upload:
|
| - try:
|
| - digests = _CreateDigestsFromDigesters(digesters)
|
| - _CheckHashes(logger, dst_url, uploaded_object, src_url.object_name,
|
| - digests, is_upload=True)
|
| - except HashMismatchException:
|
| - if _RENAME_ON_HASH_MISMATCH:
|
| - corrupted_obj_metadata = apitools_messages.Object(
|
| - name=dst_obj_metadata.name,
|
| - bucket=dst_obj_metadata.bucket,
|
| - etag=uploaded_object.etag)
|
| - dst_obj_metadata.name = (dst_url.object_name +
|
| - _RENAME_ON_HASH_MISMATCH_SUFFIX)
|
| - gsutil_api.CopyObject(corrupted_obj_metadata,
|
| - dst_obj_metadata, provider=dst_url.scheme)
|
| - # If the digest doesn't match, delete the object.
|
| - gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name,
|
| - generation=uploaded_object.generation,
|
| - provider=dst_url.scheme)
|
| - raise
|
| -
|
| - result_url = dst_url.Clone()
|
| -
|
| - result_url.generation = uploaded_object.generation
|
| - result_url.generation = GenerationFromUrlAndString(
|
| - result_url, uploaded_object.generation)
|
| -
|
| - return (elapsed_time, uploaded_object.size, result_url,
|
| - uploaded_object.md5Hash)
|
| -
|
| -
|
| -# TODO: Refactor this long function into smaller pieces.
|
| -# pylint: disable=too-many-statements
|
| -def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
|
| - gsutil_api, logger, test_method=None):
|
| - """Downloads an object to a local file.
|
| -
|
| - Args:
|
| - src_url: Source CloudUrl.
|
| - src_obj_metadata: Metadata from the source object.
|
| - dst_url: Destination FileUrl.
|
| - gsutil_api: gsutil Cloud API instance to use for the download.
|
| - logger: for outputting log messages.
|
| - test_method: Optional test method for modifying the file before validation
|
| - during unit tests.
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, dst_url, md5), excluding overhead like
|
| - initial GET.
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - global open_files_map
|
| - file_name = dst_url.object_name
|
| - dir_name = os.path.dirname(file_name)
|
| - if dir_name and not os.path.exists(dir_name):
|
| - # Do dir creation in try block so can ignore case where dir already
|
| - # exists. This is needed to avoid a race condition when running gsutil
|
| - # -m cp.
|
| - try:
|
| - os.makedirs(dir_name)
|
| - except OSError, e:
|
| - if e.errno != errno.EEXIST:
|
| - raise
|
| - api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
|
| - # For gzipped objects download to a temp file and unzip. For the XML API,
|
| - # the represents the result of a HEAD request. For the JSON API, this is
|
| - # the stored encoding which the service may not respect. However, if the
|
| - # server sends decompressed bytes for a file that is stored compressed
|
| - # (double compressed case), there is no way we can validate the hash and
|
| - # we will fail our hash check for the object.
|
| - if (src_obj_metadata.contentEncoding and
|
| - src_obj_metadata.contentEncoding.lower().endswith('gzip')):
|
| - # We can't use tempfile.mkstemp() here because we need a predictable
|
| - # filename for resumable downloads.
|
| - download_file_name = _GetDownloadZipFileName(file_name)
|
| - logger.info(
|
| - 'Downloading to temp gzip filename %s', download_file_name)
|
| - need_to_unzip = True
|
| - else:
|
| - download_file_name = file_name
|
| - need_to_unzip = False
|
| -
|
| - if download_file_name.endswith(dst_url.delim):
|
| - logger.warn('\n'.join(textwrap.wrap(
|
| - 'Skipping attempt to download to filename ending with slash (%s). This '
|
| - 'typically happens when using gsutil to download from a subdirectory '
|
| - 'created by the Cloud Console (https://cloud.google.com/console)'
|
| - % download_file_name)))
|
| - return (0, 0, dst_url, '')
|
| -
|
| - # Set up hash digesters.
|
| - hash_algs = GetDownloadHashAlgs(
|
| - logger, src_has_md5=src_obj_metadata.md5Hash,
|
| - src_has_crc32c=src_obj_metadata.crc32c)
|
| - digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
|
| -
|
| - fp = None
|
| - # Tracks whether the server used a gzip encoding.
|
| - server_encoding = None
|
| - download_complete = False
|
| - download_strategy = _SelectDownloadStrategy(dst_url)
|
| - download_start_point = 0
|
| - # This is used for resuming downloads, but also for passing the mediaLink
|
| - # and size into the download for new downloads so that we can avoid
|
| - # making an extra HTTP call.
|
| - serialization_data = None
|
| - serialization_dict = GetDownloadSerializationDict(src_obj_metadata)
|
| - open_files = []
|
| - try:
|
| - if download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
|
| - fp = open(download_file_name, 'wb')
|
| - elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
|
| - # If this is a resumable download, we need to open the file for append and
|
| - # manage a tracker file.
|
| - if open_files_map.get(download_file_name, False):
|
| - # Ensure another process/thread is not already writing to this file.
|
| - raise FileConcurrencySkipError
|
| - open_files.append(download_file_name)
|
| - open_files_map[download_file_name] = True
|
| - fp = open(download_file_name, 'ab')
|
| -
|
| - resuming = ReadOrCreateDownloadTrackerFile(
|
| - src_obj_metadata, dst_url, api_selector)
|
| - if resuming:
|
| - # Find out how far along we are so we can request the appropriate
|
| - # remaining range of the object.
|
| - existing_file_size = GetFileSize(fp, position_to_eof=True)
|
| - if existing_file_size > src_obj_metadata.size:
|
| - DeleteTrackerFile(GetTrackerFilePath(
|
| - dst_url, TrackerFileType.DOWNLOAD, api_selector))
|
| - raise CommandException(
|
| - '%s is larger (%d) than %s (%d).\nDeleting tracker file, so '
|
| - 'if you re-try this download it will start from scratch' %
|
| - (download_file_name, existing_file_size, src_url.object_name,
|
| - src_obj_metadata.size))
|
| - else:
|
| - if existing_file_size == src_obj_metadata.size:
|
| - logger.info(
|
| - 'Download already complete for file %s, skipping download but '
|
| - 'will run integrity checks.', download_file_name)
|
| - download_complete = True
|
| - else:
|
| - download_start_point = existing_file_size
|
| - serialization_dict['progress'] = download_start_point
|
| - logger.info('Resuming download for %s', src_url.url_string)
|
| - # Catch up our digester with the hash data.
|
| - if existing_file_size > TEN_MIB:
|
| - for alg_name in digesters:
|
| - logger.info(
|
| - 'Catching up %s for %s', alg_name, download_file_name)
|
| - with open(download_file_name, 'rb') as hash_fp:
|
| - while True:
|
| - data = hash_fp.read(DEFAULT_FILE_BUFFER_SIZE)
|
| - if not data:
|
| - break
|
| - for alg_name in digesters:
|
| - digesters[alg_name].update(data)
|
| - else:
|
| - # Starting a new download, blow away whatever is already there.
|
| - fp.truncate(0)
|
| - fp.seek(0)
|
| -
|
| - else:
|
| - raise CommandException('Invalid download strategy %s chosen for'
|
| - 'file %s' % (download_strategy, fp.name))
|
| -
|
| - if not dst_url.IsStream():
|
| - serialization_data = json.dumps(serialization_dict)
|
| -
|
| - progress_callback = FileProgressCallbackHandler(
|
| - ConstructAnnounceText('Downloading', dst_url.url_string),
|
| - logger).call
|
| - if global_copy_helper_opts.test_callback_file:
|
| - with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
|
| - progress_callback = pickle.loads(test_fp.read()).call
|
| -
|
| - start_time = time.time()
|
| - # TODO: With gzip encoding (which may occur on-the-fly and not be part of
|
| - # the object's metadata), when we request a range to resume, it's possible
|
| - # that the server will just resend the entire object, which means our
|
| - # caught-up hash will be incorrect. We recalculate the hash on
|
| - # the local file in the case of a failed gzip hash anyway, but it would
|
| - # be better if we actively detected this case.
|
| - if not download_complete:
|
| - server_encoding = gsutil_api.GetObjectMedia(
|
| - src_url.bucket_name, src_url.object_name, fp,
|
| - start_byte=download_start_point, generation=src_url.generation,
|
| - object_size=src_obj_metadata.size,
|
| - download_strategy=download_strategy, provider=src_url.scheme,
|
| - serialization_data=serialization_data, digesters=digesters,
|
| - progress_callback=progress_callback)
|
| -
|
| - end_time = time.time()
|
| -
|
| - # If a custom test method is defined, call it here. For the copy command,
|
| - # test methods are expected to take one argument: an open file pointer,
|
| - # and are used to perturb the open file during download to exercise
|
| - # download error detection.
|
| - if test_method:
|
| - test_method(fp)
|
| -
|
| - except ResumableDownloadException as e:
|
| - logger.warning('Caught ResumableDownloadException (%s) for file %s.',
|
| - e.reason, file_name)
|
| - raise
|
| -
|
| - finally:
|
| - if fp:
|
| - fp.close()
|
| - for file_name in open_files:
|
| - open_files_map.delete(file_name)
|
| -
|
| - # If we decompressed a content-encoding gzip file on the fly, this may not
|
| - # be accurate, but it is the best we can do without going deep into the
|
| - # underlying HTTP libraries. Note that this value is only used for
|
| - # reporting in log messages; inaccuracy doesn't impact the integrity of the
|
| - # download.
|
| - bytes_transferred = src_obj_metadata.size - download_start_point
|
| - server_gzip = server_encoding and server_encoding.lower().endswith('gzip')
|
| - local_md5 = _ValidateDownloadHashes(logger, src_url, src_obj_metadata,
|
| - dst_url, need_to_unzip, server_gzip,
|
| - digesters, hash_algs, api_selector,
|
| - bytes_transferred)
|
| -
|
| - return (end_time - start_time, bytes_transferred, dst_url, local_md5)
|
| -
|
| -
|
| -def _GetDownloadZipFileName(file_name):
|
| - """Returns the file name for a temporarily compressed downloaded file."""
|
| - return '%s_.gztmp' % file_name
|
| -
|
| -
|
| -def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
|
| - need_to_unzip, server_gzip, digesters, hash_algs,
|
| - api_selector, bytes_transferred):
|
| - """Validates a downloaded file's integrity.
|
| -
|
| - Args:
|
| - logger: For outputting log messages.
|
| - src_url: StorageUrl for the source object.
|
| - src_obj_metadata: Metadata for the source object, potentially containing
|
| - hash values.
|
| - dst_url: StorageUrl describing the destination file.
|
| - need_to_unzip: If true, a temporary zip file was used and must be
|
| - uncompressed as part of validation.
|
| - server_gzip: If true, the server gzipped the bytes (regardless of whether
|
| - the object metadata claimed it was gzipped).
|
| - digesters: dict of {string, hash digester} that contains up-to-date digests
|
| - computed during the download. If a digester for a particular
|
| - algorithm is None, an up-to-date digest is not available and the
|
| - hash must be recomputed from the local file.
|
| - hash_algs: dict of {string, hash algorithm} that can be used if digesters
|
| - don't have up-to-date digests.
|
| - api_selector: The Cloud API implementation used (used tracker file naming).
|
| - bytes_transferred: Number of bytes downloaded (used for logging).
|
| -
|
| - Returns:
|
| - An MD5 of the local file, if one was calculated as part of the integrity
|
| - check.
|
| - """
|
| - file_name = dst_url.object_name
|
| - download_file_name = (_GetDownloadZipFileName(file_name) if need_to_unzip else
|
| - file_name)
|
| - digesters_succeeded = True
|
| - for alg in digesters:
|
| - # If we get a digester with a None algorithm, the underlying
|
| - # implementation failed to calculate a digest, so we will need to
|
| - # calculate one from scratch.
|
| - if not digesters[alg]:
|
| - digesters_succeeded = False
|
| - break
|
| -
|
| - if digesters_succeeded:
|
| - local_hashes = _CreateDigestsFromDigesters(digesters)
|
| - else:
|
| - local_hashes = _CreateDigestsFromLocalFile(
|
| - logger, hash_algs, download_file_name, src_obj_metadata)
|
| -
|
| - digest_verified = True
|
| - hash_invalid_exception = None
|
| - try:
|
| - _CheckHashes(logger, src_url, src_obj_metadata, download_file_name,
|
| - local_hashes)
|
| - DeleteTrackerFile(GetTrackerFilePath(
|
| - dst_url, TrackerFileType.DOWNLOAD, api_selector))
|
| - except HashMismatchException, e:
|
| - # If an non-gzipped object gets sent with gzip content encoding, the hash
|
| - # we calculate will match the gzipped bytes, not the original object. Thus,
|
| - # we'll need to calculate and check it after unzipping.
|
| - if server_gzip:
|
| - logger.debug(
|
| - 'Hash did not match but server gzipped the content, will '
|
| - 'recalculate.')
|
| - digest_verified = False
|
| - elif api_selector == ApiSelector.XML:
|
| - logger.debug(
|
| - 'Hash did not match but server may have gzipped the content, will '
|
| - 'recalculate.')
|
| - # Save off the exception in case this isn't a gzipped file.
|
| - hash_invalid_exception = e
|
| - digest_verified = False
|
| - else:
|
| - DeleteTrackerFile(GetTrackerFilePath(
|
| - dst_url, TrackerFileType.DOWNLOAD, api_selector))
|
| - if _RENAME_ON_HASH_MISMATCH:
|
| - os.rename(download_file_name,
|
| - download_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
|
| - else:
|
| - os.unlink(download_file_name)
|
| - raise
|
| -
|
| - if server_gzip and not need_to_unzip:
|
| - # Server compressed bytes on-the-fly, thus we need to rename and decompress.
|
| - # We can't decompress on-the-fly because prior to Python 3.2 the gzip
|
| - # module makes a bunch of seek calls on the stream.
|
| - download_file_name = _GetDownloadZipFileName(file_name)
|
| - os.rename(file_name, download_file_name)
|
| -
|
| - if need_to_unzip or server_gzip:
|
| - # Log that we're uncompressing if the file is big enough that
|
| - # decompressing would make it look like the transfer "stalled" at the end.
|
| - if bytes_transferred > TEN_MIB:
|
| - logger.info(
|
| - 'Uncompressing downloaded tmp file to %s...', file_name)
|
| -
|
| - # Downloaded gzipped file to a filename w/o .gz extension, so unzip.
|
| - gzip_fp = None
|
| - try:
|
| - gzip_fp = gzip.open(download_file_name, 'rb')
|
| - with open(file_name, 'wb') as f_out:
|
| - data = gzip_fp.read(GZIP_CHUNK_SIZE)
|
| - while data:
|
| - f_out.write(data)
|
| - data = gzip_fp.read(GZIP_CHUNK_SIZE)
|
| - except IOError, e:
|
| - # In the XML case where we don't know if the file was gzipped, raise
|
| - # the original hash exception if we find that it wasn't.
|
| - if 'Not a gzipped file' in str(e) and hash_invalid_exception:
|
| - # Linter improperly thinks we're raising None despite the above check.
|
| - # pylint: disable=raising-bad-type
|
| - raise hash_invalid_exception
|
| - finally:
|
| - if gzip_fp:
|
| - gzip_fp.close()
|
| -
|
| - os.unlink(download_file_name)
|
| -
|
| - if not digest_verified:
|
| - try:
|
| - # Recalculate hashes on the unzipped local file.
|
| - local_hashes = _CreateDigestsFromLocalFile(logger, hash_algs, file_name,
|
| - src_obj_metadata)
|
| - _CheckHashes(logger, src_url, src_obj_metadata, file_name, local_hashes)
|
| - DeleteTrackerFile(GetTrackerFilePath(
|
| - dst_url, TrackerFileType.DOWNLOAD, api_selector))
|
| - except HashMismatchException:
|
| - DeleteTrackerFile(GetTrackerFilePath(
|
| - dst_url, TrackerFileType.DOWNLOAD, api_selector))
|
| - if _RENAME_ON_HASH_MISMATCH:
|
| - os.rename(file_name,
|
| - file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
|
| - else:
|
| - os.unlink(file_name)
|
| - raise
|
| -
|
| - if 'md5' in local_hashes:
|
| - return local_hashes['md5']
|
| -
|
| -
|
| -def _CopyFileToFile(src_url, dst_url):
|
| - """Copies a local file to a local file.
|
| -
|
| - Args:
|
| - src_url: Source FileUrl.
|
| - dst_url: Destination FileUrl.
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, dst_url, md5=None).
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - src_fp = GetStreamFromFileUrl(src_url)
|
| - dir_name = os.path.dirname(dst_url.object_name)
|
| - if dir_name and not os.path.exists(dir_name):
|
| - os.makedirs(dir_name)
|
| - dst_fp = open(dst_url.object_name, 'wb')
|
| - start_time = time.time()
|
| - shutil.copyfileobj(src_fp, dst_fp)
|
| - end_time = time.time()
|
| - return (end_time - start_time, os.path.getsize(dst_url.object_name),
|
| - dst_url, None)
|
| -
|
| -
|
| -def _DummyTrackerCallback(_):
|
| - pass
|
| -
|
| -
|
| -# pylint: disable=undefined-variable
|
| -def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url,
|
| - dst_obj_metadata, preconditions, gsutil_api,
|
| - logger):
|
| - """Copies from src_url to dst_url in "daisy chain" mode.
|
| -
|
| - See -D OPTION documentation about what daisy chain mode is.
|
| -
|
| - Args:
|
| - src_url: Source CloudUrl
|
| - src_obj_metadata: Metadata from source object
|
| - dst_url: Destination CloudUrl
|
| - dst_obj_metadata: Object-specific metadata that should be overidden during
|
| - the copy.
|
| - preconditions: Preconditions to use for the copy.
|
| - gsutil_api: gsutil Cloud API to use for the copy.
|
| - logger: For outputting log messages.
|
| -
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, dst_url with generation,
|
| - md5 hash of destination) excluding overhead like initial GET.
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - # We don't attempt to preserve ACLs across providers because
|
| - # GCS and S3 support different ACLs and disjoint principals.
|
| - if (global_copy_helper_opts.preserve_acl
|
| - and src_url.scheme != dst_url.scheme):
|
| - raise NotImplementedError(
|
| - 'Cross-provider cp -p not supported')
|
| - if not global_copy_helper_opts.preserve_acl:
|
| - dst_obj_metadata.acl = []
|
| -
|
| - # Don't use callbacks for downloads on the daisy chain wrapper because
|
| - # upload callbacks will output progress, but respect test hooks if present.
|
| - progress_callback = None
|
| - if global_copy_helper_opts.test_callback_file:
|
| - with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
|
| - progress_callback = pickle.loads(test_fp.read()).call
|
| -
|
| - start_time = time.time()
|
| - upload_fp = DaisyChainWrapper(src_url, src_obj_metadata.size, gsutil_api,
|
| - progress_callback=progress_callback)
|
| - uploaded_object = None
|
| - if src_obj_metadata.size == 0:
|
| - # Resumable uploads of size 0 are not supported.
|
| - uploaded_object = gsutil_api.UploadObject(
|
| - upload_fp, object_metadata=dst_obj_metadata,
|
| - canned_acl=global_copy_helper_opts.canned_acl,
|
| - preconditions=preconditions, provider=dst_url.scheme,
|
| - fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size)
|
| - else:
|
| - # TODO: Support process-break resumes. This will resume across connection
|
| - # breaks and server errors, but the tracker callback is a no-op so this
|
| - # won't resume across gsutil runs.
|
| - # TODO: Test retries via test_callback_file.
|
| - uploaded_object = gsutil_api.UploadObjectResumable(
|
| - upload_fp, object_metadata=dst_obj_metadata,
|
| - canned_acl=global_copy_helper_opts.canned_acl,
|
| - preconditions=preconditions, provider=dst_url.scheme,
|
| - fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size,
|
| - progress_callback=FileProgressCallbackHandler(
|
| - ConstructAnnounceText('Uploading', dst_url.url_string),
|
| - logger).call,
|
| - tracker_callback=_DummyTrackerCallback)
|
| - end_time = time.time()
|
| -
|
| - try:
|
| - _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
|
| - uploaded_object)
|
| - except HashMismatchException:
|
| - if _RENAME_ON_HASH_MISMATCH:
|
| - corrupted_obj_metadata = apitools_messages.Object(
|
| - name=dst_obj_metadata.name,
|
| - bucket=dst_obj_metadata.bucket,
|
| - etag=uploaded_object.etag)
|
| - dst_obj_metadata.name = (dst_url.object_name +
|
| - _RENAME_ON_HASH_MISMATCH_SUFFIX)
|
| - gsutil_api.CopyObject(corrupted_obj_metadata,
|
| - dst_obj_metadata, provider=dst_url.scheme)
|
| - # If the digest doesn't match, delete the object.
|
| - gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name,
|
| - generation=uploaded_object.generation,
|
| - provider=dst_url.scheme)
|
| - raise
|
| -
|
| - result_url = dst_url.Clone()
|
| - result_url.generation = GenerationFromUrlAndString(
|
| - result_url, uploaded_object.generation)
|
| -
|
| - return (end_time - start_time, src_obj_metadata.size, result_url,
|
| - uploaded_object.md5Hash)
|
| -
|
| -
|
| -# pylint: disable=undefined-variable
|
| -# pylint: disable=too-many-statements
|
| -def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
|
| - copy_exception_handler, allow_splitting=True,
|
| - headers=None, manifest=None, gzip_exts=None, test_method=None):
|
| - """Performs copy from src_url to dst_url, handling various special cases.
|
| -
|
| - Args:
|
| - logger: for outputting log messages.
|
| - src_url: Source StorageUrl.
|
| - dst_url: Destination StorageUrl.
|
| - gsutil_api: gsutil Cloud API instance to use for the copy.
|
| - command_obj: command object for use in Apply in parallel composite uploads.
|
| - copy_exception_handler: for handling copy exceptions during Apply.
|
| - allow_splitting: Whether to allow the file to be split into component
|
| - pieces for an parallel composite upload.
|
| - headers: optional headers to use for the copy operation.
|
| - manifest: optional manifest for tracking copy operations.
|
| - gzip_exts: List of file extensions to gzip for uploads, if any.
|
| - test_method: optional test method for modifying files during unit tests.
|
| -
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, version-specific dst_url) excluding
|
| - overhead like initial GET.
|
| -
|
| - Raises:
|
| - ItemExistsError: if no clobber flag is specified and the destination
|
| - object already exists.
|
| - SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified
|
| - and the source is an unsupported type.
|
| - CommandException: if other errors encountered.
|
| - """
|
| - if headers:
|
| - dst_obj_headers = headers.copy()
|
| - else:
|
| - dst_obj_headers = {}
|
| -
|
| - # Create a metadata instance for each destination object so metadata
|
| - # such as content-type can be applied per-object.
|
| - # Initialize metadata from any headers passed in via -h.
|
| - dst_obj_metadata = ObjectMetadataFromHeaders(dst_obj_headers)
|
| -
|
| - if dst_url.IsCloudUrl() and dst_url.scheme == 'gs':
|
| - preconditions = PreconditionsFromHeaders(dst_obj_headers)
|
| - else:
|
| - preconditions = Preconditions()
|
| -
|
| - src_obj_metadata = None
|
| - src_obj_filestream = None
|
| - if src_url.IsCloudUrl():
|
| - src_obj_fields = None
|
| - if dst_url.IsCloudUrl():
|
| - # For cloud or daisy chain copy, we need every copyable field.
|
| - # If we're not modifying or overriding any of the fields, we can get
|
| - # away without retrieving the object metadata because the copy
|
| - # operation can succeed with just the destination bucket and object
|
| - # name. But if we are sending any metadata, the JSON API will expect a
|
| - # complete object resource. Since we want metadata like the object size
|
| - # for our own tracking, we just get all of the metadata here.
|
| - src_obj_fields = ['cacheControl', 'componentCount',
|
| - 'contentDisposition', 'contentEncoding',
|
| - 'contentLanguage', 'contentType', 'crc32c',
|
| - 'etag', 'generation', 'md5Hash', 'mediaLink',
|
| - 'metadata', 'metageneration', 'size']
|
| - # We only need the ACL if we're going to preserve it.
|
| - if global_copy_helper_opts.preserve_acl:
|
| - src_obj_fields.append('acl')
|
| - if (src_url.scheme == dst_url.scheme
|
| - and not global_copy_helper_opts.daisy_chain):
|
| - copy_in_the_cloud = True
|
| - else:
|
| - copy_in_the_cloud = False
|
| - else:
|
| - # Just get the fields needed to validate the download.
|
| - src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag',
|
| - 'mediaLink', 'md5Hash', 'size']
|
| -
|
| - if (src_url.scheme == 's3' and
|
| - global_copy_helper_opts.skip_unsupported_objects):
|
| - src_obj_fields.append('storageClass')
|
| -
|
| - try:
|
| - src_generation = GenerationFromUrlAndString(src_url, src_url.generation)
|
| - src_obj_metadata = gsutil_api.GetObjectMetadata(
|
| - src_url.bucket_name, src_url.object_name,
|
| - generation=src_generation, provider=src_url.scheme,
|
| - fields=src_obj_fields)
|
| - except NotFoundException:
|
| - raise CommandException(
|
| - 'NotFoundException: Could not retrieve source object %s.' %
|
| - src_url.url_string)
|
| - if (src_url.scheme == 's3' and
|
| - global_copy_helper_opts.skip_unsupported_objects and
|
| - src_obj_metadata.storageClass == 'GLACIER'):
|
| - raise SkipGlacierError()
|
| -
|
| - src_obj_size = src_obj_metadata.size
|
| - dst_obj_metadata.contentType = src_obj_metadata.contentType
|
| - if global_copy_helper_opts.preserve_acl:
|
| - dst_obj_metadata.acl = src_obj_metadata.acl
|
| - # Special case for S3-to-S3 copy URLs using
|
| - # global_copy_helper_opts.preserve_acl.
|
| - # dst_url will be verified in _CopyObjToObjDaisyChainMode if it
|
| - # is not s3 (and thus differs from src_url).
|
| - if src_url.scheme == 's3':
|
| - acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata)
|
| - if acl_text:
|
| - AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text)
|
| - else:
|
| - try:
|
| - src_obj_filestream = GetStreamFromFileUrl(src_url)
|
| - except Exception, e: # pylint: disable=broad-except
|
| - raise CommandException('Error opening file "%s": %s.' % (src_url,
|
| - e.message))
|
| - if src_url.IsStream():
|
| - src_obj_size = None
|
| - else:
|
| - src_obj_size = os.path.getsize(src_url.object_name)
|
| -
|
| - if global_copy_helper_opts.use_manifest:
|
| - # Set the source size in the manifest.
|
| - manifest.Set(src_url.url_string, 'size', src_obj_size)
|
| -
|
| - if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE
|
| - and src_url != 's3'):
|
| - raise CommandException(
|
| - '"%s" exceeds the maximum gsutil-supported size for an S3 upload. S3 '
|
| - 'objects greater than %s in size require multipart uploads, which '
|
| - 'gsutil does not support.' % (src_url,
|
| - MakeHumanReadable(S3_MAX_UPLOAD_SIZE)))
|
| -
|
| - # On Windows, stdin is opened as text mode instead of binary which causes
|
| - # problems when piping a binary file, so this switches it to binary mode.
|
| - if IS_WINDOWS and src_url.IsFileUrl() and src_url.IsStream():
|
| - msvcrt.setmode(GetStreamFromFileUrl(src_url).fileno(), os.O_BINARY)
|
| -
|
| - if global_copy_helper_opts.no_clobber:
|
| - # There are two checks to prevent clobbering:
|
| - # 1) The first check is to see if the URL
|
| - # already exists at the destination and prevent the upload/download
|
| - # from happening. This is done by the exists() call.
|
| - # 2) The second check is only relevant if we are writing to gs. We can
|
| - # enforce that the server only writes the object if it doesn't exist
|
| - # by specifying the header below. This check only happens at the
|
| - # server after the complete file has been uploaded. We specify this
|
| - # header to prevent a race condition where a destination file may
|
| - # be created after the first check and before the file is fully
|
| - # uploaded.
|
| - # In order to save on unnecessary uploads/downloads we perform both
|
| - # checks. However, this may come at the cost of additional HTTP calls.
|
| - if preconditions.gen_match:
|
| - raise ArgumentException('Specifying x-goog-if-generation-match is '
|
| - 'not supported with cp -n')
|
| - else:
|
| - preconditions.gen_match = 0
|
| - if dst_url.IsFileUrl() and os.path.exists(dst_url.object_name):
|
| - # The local file may be a partial. Check the file sizes.
|
| - if src_obj_size == os.path.getsize(dst_url.object_name):
|
| - raise ItemExistsError()
|
| - elif dst_url.IsCloudUrl():
|
| - try:
|
| - dst_object = gsutil_api.GetObjectMetadata(
|
| - dst_url.bucket_name, dst_url.object_name, provider=dst_url.scheme)
|
| - except NotFoundException:
|
| - dst_object = None
|
| - if dst_object:
|
| - raise ItemExistsError()
|
| -
|
| - if dst_url.IsCloudUrl():
|
| - # Cloud storage API gets object and bucket name from metadata.
|
| - dst_obj_metadata.name = dst_url.object_name
|
| - dst_obj_metadata.bucket = dst_url.bucket_name
|
| - if src_url.IsCloudUrl():
|
| - # Preserve relevant metadata from the source object if it's not already
|
| - # provided from the headers.
|
| - CopyObjectMetadata(src_obj_metadata, dst_obj_metadata, override=False)
|
| - src_obj_metadata.name = src_url.object_name
|
| - src_obj_metadata.bucket = src_url.bucket_name
|
| - else:
|
| - _SetContentTypeFromFile(src_url, dst_obj_metadata)
|
| - else:
|
| - # Files don't have Cloud API metadata.
|
| - dst_obj_metadata = None
|
| -
|
| - _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata)
|
| -
|
| - if src_url.IsCloudUrl():
|
| - if dst_url.IsFileUrl():
|
| - return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
|
| - gsutil_api, logger, test_method=test_method)
|
| - elif copy_in_the_cloud:
|
| - return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
|
| - dst_obj_metadata, preconditions,
|
| - gsutil_api, logger)
|
| - else:
|
| - return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata,
|
| - dst_url, dst_obj_metadata,
|
| - preconditions, gsutil_api, logger)
|
| - else: # src_url.IsFileUrl()
|
| - if dst_url.IsCloudUrl():
|
| - return _UploadFileToObject(
|
| - src_url, src_obj_filestream, src_obj_size, dst_url,
|
| - dst_obj_metadata, preconditions, gsutil_api, logger, command_obj,
|
| - copy_exception_handler, gzip_exts=gzip_exts,
|
| - allow_splitting=allow_splitting)
|
| - else: # dst_url.IsFileUrl()
|
| - return _CopyFileToFile(src_url, dst_url)
|
| -
|
| -
|
| -class Manifest(object):
|
| - """Stores the manifest items for the CpCommand class."""
|
| -
|
| - def __init__(self, path):
|
| - # self.items contains a dictionary of rows
|
| - self.items = {}
|
| - self.manifest_filter = {}
|
| - self.lock = CreateLock()
|
| -
|
| - self.manifest_path = os.path.expanduser(path)
|
| - self._ParseManifest()
|
| - self._CreateManifestFile()
|
| -
|
| - def _ParseManifest(self):
|
| - """Load and parse a manifest file.
|
| -
|
| - This information will be used to skip any files that have a skip or OK
|
| - status.
|
| - """
|
| - try:
|
| - if os.path.exists(self.manifest_path):
|
| - with open(self.manifest_path, 'rb') as f:
|
| - first_row = True
|
| - reader = csv.reader(f)
|
| - for row in reader:
|
| - if first_row:
|
| - try:
|
| - source_index = row.index('Source')
|
| - result_index = row.index('Result')
|
| - except ValueError:
|
| - # No header and thus not a valid manifest file.
|
| - raise CommandException(
|
| - 'Missing headers in manifest file: %s' % self.manifest_path)
|
| - first_row = False
|
| - source = row[source_index]
|
| - result = row[result_index]
|
| - if result in ['OK', 'skip']:
|
| - # We're always guaranteed to take the last result of a specific
|
| - # source url.
|
| - self.manifest_filter[source] = result
|
| - except IOError:
|
| - raise CommandException('Could not parse %s' % self.manifest_path)
|
| -
|
| - def WasSuccessful(self, src):
|
| - """Returns whether the specified src url was marked as successful."""
|
| - return src in self.manifest_filter
|
| -
|
| - def _CreateManifestFile(self):
|
| - """Opens the manifest file and assigns it to the file pointer."""
|
| - try:
|
| - if ((not os.path.exists(self.manifest_path))
|
| - or (os.stat(self.manifest_path).st_size == 0)):
|
| - # Add headers to the new file.
|
| - with open(self.manifest_path, 'wb', 1) as f:
|
| - writer = csv.writer(f)
|
| - writer.writerow(['Source',
|
| - 'Destination',
|
| - 'Start',
|
| - 'End',
|
| - 'Md5',
|
| - 'UploadId',
|
| - 'Source Size',
|
| - 'Bytes Transferred',
|
| - 'Result',
|
| - 'Description'])
|
| - except IOError:
|
| - raise CommandException('Could not create manifest file.')
|
| -
|
| - def Set(self, url, key, value):
|
| - if value is None:
|
| - # In case we don't have any information to set we bail out here.
|
| - # This is so that we don't clobber existing information.
|
| - # To zero information pass '' instead of None.
|
| - return
|
| - if url in self.items:
|
| - self.items[url][key] = value
|
| - else:
|
| - self.items[url] = {key: value}
|
| -
|
| - def Initialize(self, source_url, destination_url):
|
| - # Always use the source_url as the key for the item. This is unique.
|
| - self.Set(source_url, 'source_uri', source_url)
|
| - self.Set(source_url, 'destination_uri', destination_url)
|
| - self.Set(source_url, 'start_time', datetime.datetime.utcnow())
|
| -
|
| - def SetResult(self, source_url, bytes_transferred, result,
|
| - description=''):
|
| - self.Set(source_url, 'bytes', bytes_transferred)
|
| - self.Set(source_url, 'result', result)
|
| - self.Set(source_url, 'description', description)
|
| - self.Set(source_url, 'end_time', datetime.datetime.utcnow())
|
| - self._WriteRowToManifestFile(source_url)
|
| - self._RemoveItemFromManifest(source_url)
|
| -
|
| - def _WriteRowToManifestFile(self, url):
|
| - """Writes a manifest entry to the manifest file for the url argument."""
|
| - row_item = self.items[url]
|
| - data = [
|
| - str(row_item['source_uri'].encode(UTF8)),
|
| - str(row_item['destination_uri'].encode(UTF8)),
|
| - '%sZ' % row_item['start_time'].isoformat(),
|
| - '%sZ' % row_item['end_time'].isoformat(),
|
| - row_item['md5'] if 'md5' in row_item else '',
|
| - row_item['upload_id'] if 'upload_id' in row_item else '',
|
| - str(row_item['size']) if 'size' in row_item else '',
|
| - str(row_item['bytes']) if 'bytes' in row_item else '',
|
| - row_item['result'],
|
| - row_item['description'].encode(UTF8)]
|
| -
|
| - # Aquire a lock to prevent multiple threads writing to the same file at
|
| - # the same time. This would cause a garbled mess in the manifest file.
|
| - with self.lock:
|
| - with open(self.manifest_path, 'a', 1) as f: # 1 == line buffered
|
| - writer = csv.writer(f)
|
| - writer.writerow(data)
|
| -
|
| - def _RemoveItemFromManifest(self, url):
|
| - # Remove the item from the dictionary since we're done with it and
|
| - # we don't want the dictionary to grow too large in memory for no good
|
| - # reason.
|
| - del self.items[url]
|
| -
|
| -
|
| -class ItemExistsError(Exception):
|
| - """Exception class for objects that are skipped because they already exist."""
|
| - pass
|
| -
|
| -
|
| -class SkipUnsupportedObjectError(Exception):
|
| - """Exception for objects skipped because they are an unsupported type."""
|
| -
|
| - def __init__(self):
|
| - super(SkipUnsupportedObjectError, self).__init__()
|
| - self.unsupported_type = 'Unknown'
|
| -
|
| -
|
| -class SkipGlacierError(SkipUnsupportedObjectError):
|
| - """Exception for objects skipped because they are an unsupported type."""
|
| -
|
| - def __init__(self):
|
| - super(SkipGlacierError, self).__init__()
|
| - self.unsupported_type = 'GLACIER'
|
| -
|
| -
|
| -def GetPathBeforeFinalDir(url):
|
| - """Returns the path section before the final directory component of the URL.
|
| -
|
| - This handles cases for file system directories, bucket, and bucket
|
| - subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket',
|
| - and for file://dir we'll return file://
|
| -
|
| - Args:
|
| - url: StorageUrl representing a filesystem directory, cloud bucket or
|
| - bucket subdir.
|
| -
|
| - Returns:
|
| - String name of above-described path, sans final path separator.
|
| - """
|
| - sep = url.delim
|
| - if url.IsFileUrl():
|
| - past_scheme = url.url_string[len('file://'):]
|
| - if past_scheme.find(sep) == -1:
|
| - return 'file://'
|
| - else:
|
| - return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0]
|
| - if url.IsBucket():
|
| - return '%s://' % url.scheme
|
| - # Else it names a bucket subdir.
|
| - return url.url_string.rstrip(sep).rpartition(sep)[0]
|
| -
|
| -
|
| -def _DivideAndCeil(dividend, divisor):
|
| - """Returns ceil(dividend / divisor).
|
| -
|
| - Takes care to avoid the pitfalls of floating point arithmetic that could
|
| - otherwise yield the wrong result for large numbers.
|
| -
|
| - Args:
|
| - dividend: Dividend for the operation.
|
| - divisor: Divisor for the operation.
|
| -
|
| - Returns:
|
| - Quotient.
|
| - """
|
| - quotient = dividend // divisor
|
| - if (dividend % divisor) != 0:
|
| - quotient += 1
|
| - return quotient
|
| -
|
| -
|
| -def _GetPartitionInfo(file_size, max_components, default_component_size):
|
| - """Gets info about a file partition for parallel composite uploads.
|
| -
|
| - Args:
|
| - file_size: The number of bytes in the file to be partitioned.
|
| - max_components: The maximum number of components that can be composed.
|
| - default_component_size: The size of a component, assuming that
|
| - max_components is infinite.
|
| - Returns:
|
| - The number of components in the partitioned file, and the size of each
|
| - component (except the last, which will have a different size iff
|
| - file_size != 0 (mod num_components)).
|
| - """
|
| - # num_components = ceil(file_size / default_component_size)
|
| - num_components = _DivideAndCeil(file_size, default_component_size)
|
| -
|
| - # num_components must be in the range [2, max_components]
|
| - num_components = max(min(num_components, max_components), 2)
|
| -
|
| - # component_size = ceil(file_size / num_components)
|
| - component_size = _DivideAndCeil(file_size, num_components)
|
| - return (num_components, component_size)
|
| -
|
| -
|
| -def _DeleteObjectFn(cls, url_to_delete, thread_state=None):
|
| - """Wrapper function to be used with command.Apply()."""
|
| - gsutil_api = GetCloudApiInstance(cls, thread_state)
|
| - gsutil_api.DeleteObject(
|
| - url_to_delete.bucket_name, url_to_delete.object_name,
|
| - generation=url_to_delete.generation, provider=url_to_delete.scheme)
|
| -
|
| -
|
| -def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock):
|
| - """Parse the tracker file from the last parallel composite upload attempt.
|
| -
|
| - If it exists, the tracker file is of the format described in
|
| - _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be
|
| - read, then the upload will start from the beginning.
|
| -
|
| - Args:
|
| - tracker_file: The name of the file to parse.
|
| - tracker_file_lock: Lock protecting access to the tracker file.
|
| -
|
| - Returns:
|
| - random_prefix: A randomly-generated prefix to the name of the
|
| - temporary components.
|
| - existing_objects: A list of ObjectFromTracker objects representing
|
| - the set of files that have already been uploaded.
|
| - """
|
| -
|
| - def GenerateRandomPrefix():
|
| - return str(random.randint(1, (10 ** 10) - 1))
|
| -
|
| - existing_objects = []
|
| - try:
|
| - with tracker_file_lock:
|
| - with open(tracker_file, 'r') as fp:
|
| - lines = fp.readlines()
|
| - lines = [line.strip() for line in lines]
|
| - if not lines:
|
| - print('Parallel upload tracker file (%s) was invalid. '
|
| - 'Restarting upload from scratch.' % tracker_file)
|
| - lines = [GenerateRandomPrefix()]
|
| -
|
| - except IOError as e:
|
| - # We can't read the tracker file, so generate a new random prefix.
|
| - lines = [GenerateRandomPrefix()]
|
| -
|
| - # Ignore non-existent file (happens first time an upload
|
| - # is attempted on a file), but warn user for other errors.
|
| - if e.errno != errno.ENOENT:
|
| - # Will restart because we failed to read in the file.
|
| - print('Couldn\'t read parallel upload tracker file (%s): %s. '
|
| - 'Restarting upload from scratch.' % (tracker_file, e.strerror))
|
| -
|
| - # The first line contains the randomly-generated prefix.
|
| - random_prefix = lines[0]
|
| -
|
| - # The remaining lines were written in pairs to describe a single component
|
| - # in the form:
|
| - # object_name (without random prefix)
|
| - # generation
|
| - # Newlines are used as the delimiter because only newlines and carriage
|
| - # returns are invalid characters in object names, and users can specify
|
| - # a custom prefix in the config file.
|
| - i = 1
|
| - while i < len(lines):
|
| - (name, generation) = (lines[i], lines[i+1])
|
| - if not generation:
|
| - # Cover the '' case.
|
| - generation = None
|
| - existing_objects.append(ObjectFromTracker(name, generation))
|
| - i += 2
|
| - return (random_prefix, existing_objects)
|
| -
|
| -
|
| -def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component,
|
| - tracker_file_lock):
|
| - """Appends info about the uploaded component to an existing tracker file.
|
| -
|
| - Follows the format described in _CreateParallelUploadTrackerFile.
|
| -
|
| - Args:
|
| - tracker_file: Tracker file to append to.
|
| - component: Component that was uploaded.
|
| - tracker_file_lock: Thread and process-safe Lock for the tracker file.
|
| - """
|
| - lines = _GetParallelUploadTrackerFileLinesForComponents([component])
|
| - lines = [line + '\n' for line in lines]
|
| - with tracker_file_lock:
|
| - with open(tracker_file, 'a') as f:
|
| - f.writelines(lines)
|
| -
|
| -
|
| -def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components,
|
| - tracker_file_lock):
|
| - """Writes information about components that were successfully uploaded.
|
| -
|
| - This way the upload can be resumed at a later date. The tracker file has
|
| - the format:
|
| - random_prefix
|
| - temp_object_1_name
|
| - temp_object_1_generation
|
| - .
|
| - .
|
| - .
|
| - temp_object_N_name
|
| - temp_object_N_generation
|
| - where N is the number of components that have been successfully uploaded.
|
| -
|
| - Args:
|
| - tracker_file: The name of the parallel upload tracker file.
|
| - random_prefix: The randomly-generated prefix that was used for
|
| - for uploading any existing components.
|
| - components: A list of ObjectFromTracker objects that were uploaded.
|
| - tracker_file_lock: The lock protecting access to the tracker file.
|
| - """
|
| - lines = [random_prefix]
|
| - lines += _GetParallelUploadTrackerFileLinesForComponents(components)
|
| - lines = [line + '\n' for line in lines]
|
| - try:
|
| - with tracker_file_lock:
|
| - open(tracker_file, 'w').close() # Clear the file.
|
| - with open(tracker_file, 'w') as f:
|
| - f.writelines(lines)
|
| - except IOError as e:
|
| - RaiseUnwritableTrackerFileException(tracker_file, e.strerror)
|
| -
|
| -
|
| -def _GetParallelUploadTrackerFileLinesForComponents(components):
|
| - """Return a list of the lines for use in a parallel upload tracker file.
|
| -
|
| - The lines represent the given components, using the format as described in
|
| - _CreateParallelUploadTrackerFile.
|
| -
|
| - Args:
|
| - components: A list of ObjectFromTracker objects that were uploaded.
|
| -
|
| - Returns:
|
| - Lines describing components with their generation for outputting to the
|
| - tracker file.
|
| - """
|
| - lines = []
|
| - for component in components:
|
| - generation = None
|
| - generation = component.generation
|
| - if not generation:
|
| - generation = ''
|
| - lines += [component.object_name, str(generation)]
|
| - return lines
|
| -
|
| -
|
| -def FilterExistingComponents(dst_args, existing_components, bucket_url,
|
| - gsutil_api):
|
| - """Determines course of action for component objects.
|
| -
|
| - Given the list of all target objects based on partitioning the file and
|
| - the list of objects that have already been uploaded successfully,
|
| - this function determines which objects should be uploaded, which
|
| - existing components are still valid, and which existing components should
|
| - be deleted.
|
| -
|
| - Args:
|
| - dst_args: The map of file_name -> PerformParallelUploadFileToObjectArgs
|
| - calculated by partitioning the file.
|
| - existing_components: A list of ObjectFromTracker objects that have been
|
| - uploaded in the past.
|
| - bucket_url: CloudUrl of the bucket in which the components exist.
|
| - gsutil_api: gsutil Cloud API instance to use for retrieving object metadata.
|
| -
|
| - Returns:
|
| - components_to_upload: List of components that need to be uploaded.
|
| - uploaded_components: List of components that have already been
|
| - uploaded and are still valid.
|
| - existing_objects_to_delete: List of components that have already
|
| - been uploaded, but are no longer valid
|
| - and are in a versioned bucket, and
|
| - therefore should be deleted.
|
| - """
|
| - components_to_upload = []
|
| - existing_component_names = [component.object_name
|
| - for component in existing_components]
|
| - for component_name in dst_args:
|
| - if component_name not in existing_component_names:
|
| - components_to_upload.append(dst_args[component_name])
|
| -
|
| - objects_already_chosen = []
|
| -
|
| - # Don't reuse any temporary components whose MD5 doesn't match the current
|
| - # MD5 of the corresponding part of the file. If the bucket is versioned,
|
| - # also make sure that we delete the existing temporary version.
|
| - existing_objects_to_delete = []
|
| - uploaded_components = []
|
| - for tracker_object in existing_components:
|
| - if (tracker_object.object_name not in dst_args.keys()
|
| - or tracker_object.object_name in objects_already_chosen):
|
| - # This could happen if the component size has changed. This also serves
|
| - # to handle object names that get duplicated in the tracker file due
|
| - # to people doing things they shouldn't (e.g., overwriting an existing
|
| - # temporary component in a versioned bucket).
|
| -
|
| - url = bucket_url.Clone()
|
| - url.object_name = tracker_object.object_name
|
| - url.generation = tracker_object.generation
|
| - existing_objects_to_delete.append(url)
|
| - continue
|
| -
|
| - dst_arg = dst_args[tracker_object.object_name]
|
| - file_part = FilePart(dst_arg.filename, dst_arg.file_start,
|
| - dst_arg.file_length)
|
| - # TODO: calculate MD5's in parallel when possible.
|
| - content_md5 = CalculateB64EncodedMd5FromContents(file_part)
|
| -
|
| - try:
|
| - # Get the MD5 of the currently-existing component.
|
| - dst_url = dst_arg.dst_url
|
| - dst_metadata = gsutil_api.GetObjectMetadata(
|
| - dst_url.bucket_name, dst_url.object_name,
|
| - generation=dst_url.generation, provider=dst_url.scheme,
|
| - fields=['md5Hash', 'etag'])
|
| - cloud_md5 = dst_metadata.md5Hash
|
| - except Exception: # pylint: disable=broad-except
|
| - # We don't actually care what went wrong - we couldn't retrieve the
|
| - # object to check the MD5, so just upload it again.
|
| - cloud_md5 = None
|
| -
|
| - if cloud_md5 != content_md5:
|
| - components_to_upload.append(dst_arg)
|
| - objects_already_chosen.append(tracker_object.object_name)
|
| - if tracker_object.generation:
|
| - # If the old object doesn't have a generation (i.e., it isn't in a
|
| - # versioned bucket), then we will just overwrite it anyway.
|
| - invalid_component_with_generation = dst_arg.dst_url.Clone()
|
| - invalid_component_with_generation.generation = tracker_object.generation
|
| - existing_objects_to_delete.append(invalid_component_with_generation)
|
| - else:
|
| - url = dst_arg.dst_url.Clone()
|
| - url.generation = tracker_object.generation
|
| - uploaded_components.append(url)
|
| - objects_already_chosen.append(tracker_object.object_name)
|
| -
|
| - if uploaded_components:
|
| - logging.info('Found %d existing temporary components to reuse.',
|
| - len(uploaded_components))
|
| -
|
| - return (components_to_upload, uploaded_components,
|
| - existing_objects_to_delete)
|
|
|