Index: tools/telemetry/third_party/gsutil/gslib/copy_helper.py |
diff --git a/tools/telemetry/third_party/gsutil/gslib/copy_helper.py b/tools/telemetry/third_party/gsutil/gslib/copy_helper.py |
deleted file mode 100644 |
index 456b15cf47a011e3c5b71a426c46dffeb520a833..0000000000000000000000000000000000000000 |
--- a/tools/telemetry/third_party/gsutil/gslib/copy_helper.py |
+++ /dev/null |
@@ -1,2787 +0,0 @@ |
-# -*- coding: utf-8 -*- |
-# Copyright 2011 Google Inc. All Rights Reserved. |
-# Copyright 2011, Nexenta Systems Inc. |
-# |
-# Licensed under the Apache License, Version 2.0 (the "License"); |
-# you may not use this file except in compliance with the License. |
-# You may obtain a copy of the License at |
-# |
-# http://www.apache.org/licenses/LICENSE-2.0 |
-# |
-# Unless required by applicable law or agreed to in writing, software |
-# distributed under the License is distributed on an "AS IS" BASIS, |
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
-# See the License for the specific language governing permissions and |
-# limitations under the License. |
-"""Helper functions for copy functionality.""" |
- |
-from __future__ import absolute_import |
- |
-import base64 |
-from collections import namedtuple |
-import csv |
-import datetime |
-import errno |
-import gzip |
-from hashlib import md5 |
-import json |
-import logging |
-import mimetypes |
-import multiprocessing |
-import os |
-import pickle |
-import random |
-import re |
-import shutil |
-import stat |
-import subprocess |
-import tempfile |
-import textwrap |
-import time |
-import traceback |
- |
-from boto import config |
-import crcmod |
- |
-import gslib |
-from gslib.cloud_api import ArgumentException |
-from gslib.cloud_api import CloudApi |
-from gslib.cloud_api import NotFoundException |
-from gslib.cloud_api import PreconditionException |
-from gslib.cloud_api import Preconditions |
-from gslib.cloud_api import ResumableDownloadException |
-from gslib.cloud_api import ResumableUploadAbortException |
-from gslib.cloud_api import ResumableUploadException |
-from gslib.cloud_api import ResumableUploadStartOverException |
-from gslib.cloud_api_helper import GetDownloadSerializationDict |
-from gslib.commands.compose import MAX_COMPOSE_ARITY |
-from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE |
-from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD |
-from gslib.cs_api_map import ApiSelector |
-from gslib.daisy_chain_wrapper import DaisyChainWrapper |
-from gslib.exception import CommandException |
-from gslib.exception import HashMismatchException |
-from gslib.file_part import FilePart |
-from gslib.hashing_helper import Base64EncodeHash |
-from gslib.hashing_helper import CalculateB64EncodedMd5FromContents |
-from gslib.hashing_helper import CalculateHashesFromContents |
-from gslib.hashing_helper import GetDownloadHashAlgs |
-from gslib.hashing_helper import GetUploadHashAlgs |
-from gslib.hashing_helper import HashingFileUploadWrapper |
-from gslib.parallelism_framework_util import ThreadAndProcessSafeDict |
-from gslib.parallelism_framework_util import ThreadSafeDict |
-from gslib.progress_callback import ConstructAnnounceText |
-from gslib.progress_callback import FileProgressCallbackHandler |
-from gslib.progress_callback import ProgressCallbackWithBackoff |
-from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper |
-from gslib.storage_url import ContainsWildcard |
-from gslib.storage_url import StorageUrlFromString |
-from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages |
-from gslib.tracker_file import DeleteTrackerFile |
-from gslib.tracker_file import GetTrackerFilePath |
-from gslib.tracker_file import RaiseUnwritableTrackerFileException |
-from gslib.tracker_file import ReadOrCreateDownloadTrackerFile |
-from gslib.tracker_file import TrackerFileType |
-from gslib.translation_helper import AddS3MarkerAclToObjectMetadata |
-from gslib.translation_helper import CopyObjectMetadata |
-from gslib.translation_helper import DEFAULT_CONTENT_TYPE |
-from gslib.translation_helper import GenerationFromUrlAndString |
-from gslib.translation_helper import ObjectMetadataFromHeaders |
-from gslib.translation_helper import PreconditionsFromHeaders |
-from gslib.translation_helper import S3MarkerAclFromObjectMetadata |
-from gslib.util import CreateLock |
-from gslib.util import DEFAULT_FILE_BUFFER_SIZE |
-from gslib.util import GetCloudApiInstance |
-from gslib.util import GetFileSize |
-from gslib.util import GetJsonResumableChunkSize |
-from gslib.util import GetMaxRetryDelay |
-from gslib.util import GetNumRetries |
-from gslib.util import GetStreamFromFileUrl |
-from gslib.util import HumanReadableToBytes |
-from gslib.util import IS_WINDOWS |
-from gslib.util import IsCloudSubdirPlaceholder |
-from gslib.util import MakeHumanReadable |
-from gslib.util import MIN_SIZE_COMPUTE_LOGGING |
-from gslib.util import MultiprocessingIsAvailable |
-from gslib.util import ResumableThreshold |
-from gslib.util import TEN_MIB |
-from gslib.util import UTF8 |
-from gslib.wildcard_iterator import CreateWildcardIterator |
- |
-# pylint: disable=g-import-not-at-top |
-if IS_WINDOWS: |
- import msvcrt |
- from ctypes import c_int |
- from ctypes import c_uint64 |
- from ctypes import c_char_p |
- from ctypes import c_wchar_p |
- from ctypes import windll |
- from ctypes import POINTER |
- from ctypes import WINFUNCTYPE |
- from ctypes import WinError |
- |
-# Declare copy_helper_opts as a global because namedtuple isn't aware of |
-# assigning to a class member (which breaks pickling done by multiprocessing). |
-# For details see |
-# http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instance-correctly |
-# Similarly can't pickle logger. |
-# pylint: disable=global-at-module-level |
-global global_copy_helper_opts, global_logger |
- |
-# In-memory map of local files that are currently opened for write. Used to |
-# ensure that if we write to the same file twice (say, for example, because the |
-# user specified two identical source URLs), the writes occur serially. |
-global open_files_map |
-open_files_map = ( |
- ThreadSafeDict() if (IS_WINDOWS or not MultiprocessingIsAvailable()[0]) |
- else ThreadAndProcessSafeDict(multiprocessing.Manager())) |
- |
-# For debugging purposes; if True, files and objects that fail hash validation |
-# will be saved with the below suffix appended. |
-_RENAME_ON_HASH_MISMATCH = False |
-_RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt' |
- |
-PARALLEL_UPLOAD_TEMP_NAMESPACE = ( |
- u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/') |
- |
-PARALLEL_UPLOAD_STATIC_SALT = u""" |
-PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS. |
-The theory is that no user will have prepended this to the front of |
-one of their object names and then done an MD5 hash of the name, and |
-then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object |
-name. Note that there will be no problems with object name length since we |
-hash the original name. |
-""" |
- |
-# When uploading a file, get the following fields in the response for |
-# filling in command output and manifests. |
-UPLOAD_RETURN_FIELDS = ['crc32c', 'etag', 'generation', 'md5Hash', 'size'] |
- |
-# This tuple is used only to encapsulate the arguments needed for |
-# command.Apply() in the parallel composite upload case. |
-# Note that content_type is used instead of a full apitools Object() because |
-# apitools objects are not picklable. |
-# filename: String name of file. |
-# file_start: start byte of file (may be in the middle of a file for partitioned |
-# files). |
-# file_length: length of upload (may not be the entire length of a file for |
-# partitioned files). |
-# src_url: FileUrl describing the source file. |
-# dst_url: CloudUrl describing the destination component file. |
-# canned_acl: canned_acl to apply to the uploaded file/component. |
-# content_type: content-type for final object, used for setting content-type |
-# of components and final object. |
-# tracker_file: tracker file for this component. |
-# tracker_file_lock: tracker file lock for tracker file(s). |
-PerformParallelUploadFileToObjectArgs = namedtuple( |
- 'PerformParallelUploadFileToObjectArgs', |
- 'filename file_start file_length src_url dst_url canned_acl ' |
- 'content_type tracker_file tracker_file_lock') |
- |
-ObjectFromTracker = namedtuple('ObjectFromTracker', |
- 'object_name generation') |
- |
-# TODO: Refactor this file to be less cumbersome. In particular, some of the |
-# different paths (e.g., uploading a file to an object vs. downloading an |
-# object to a file) could be split into separate files. |
- |
-# Chunk size to use while zipping/unzipping gzip files. |
-GZIP_CHUNK_SIZE = 8192 |
- |
-PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024 |
- |
-# S3 requires special Multipart upload logic (that we currently don't implement) |
-# for files > 5GiB in size. |
-S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024 |
- |
-suggested_parallel_composites = False |
- |
- |
-class FileConcurrencySkipError(Exception): |
- """Raised when skipping a file due to a concurrent, duplicate copy.""" |
- |
- |
-def _RmExceptionHandler(cls, e): |
- """Simple exception handler to allow post-completion status.""" |
- cls.logger.error(str(e)) |
- |
- |
-def _ParallelUploadCopyExceptionHandler(cls, e): |
- """Simple exception handler to allow post-completion status.""" |
- cls.logger.error(str(e)) |
- cls.op_failure_count += 1 |
- cls.logger.debug('\n\nEncountered exception while copying:\n%s\n', |
- traceback.format_exc()) |
- |
- |
-def _PerformParallelUploadFileToObject(cls, args, thread_state=None): |
- """Function argument to Apply for performing parallel composite uploads. |
- |
- Args: |
- cls: Calling Command class. |
- args: PerformParallelUploadFileToObjectArgs tuple describing the target. |
- thread_state: gsutil Cloud API instance to use for the operation. |
- |
- Returns: |
- StorageUrl representing a successfully uploaded component. |
- """ |
- fp = FilePart(args.filename, args.file_start, args.file_length) |
- gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) |
- with fp: |
- # We take many precautions with the component names that make collisions |
- # effectively impossible. Specifying preconditions will just allow us to |
- # reach a state in which uploads will always fail on retries. |
- preconditions = None |
- |
- # Fill in content type if one was provided. |
- dst_object_metadata = apitools_messages.Object( |
- name=args.dst_url.object_name, |
- bucket=args.dst_url.bucket_name, |
- contentType=args.content_type) |
- |
- try: |
- if global_copy_helper_opts.canned_acl: |
- # No canned ACL support in JSON, force XML API to be used for |
- # upload/copy operations. |
- orig_prefer_api = gsutil_api.prefer_api |
- gsutil_api.prefer_api = ApiSelector.XML |
- ret = _UploadFileToObject(args.src_url, fp, args.file_length, |
- args.dst_url, dst_object_metadata, |
- preconditions, gsutil_api, cls.logger, cls, |
- _ParallelUploadCopyExceptionHandler, |
- gzip_exts=None, allow_splitting=False) |
- finally: |
- if global_copy_helper_opts.canned_acl: |
- gsutil_api.prefer_api = orig_prefer_api |
- |
- component = ret[2] |
- _AppendComponentTrackerToParallelUploadTrackerFile( |
- args.tracker_file, component, args.tracker_file_lock) |
- return ret |
- |
- |
-CopyHelperOpts = namedtuple('CopyHelperOpts', [ |
- 'perform_mv', |
- 'no_clobber', |
- 'daisy_chain', |
- 'read_args_from_stdin', |
- 'print_ver', |
- 'use_manifest', |
- 'preserve_acl', |
- 'canned_acl', |
- 'skip_unsupported_objects', |
- 'test_callback_file']) |
- |
- |
-# pylint: disable=global-variable-undefined |
-def CreateCopyHelperOpts(perform_mv=False, no_clobber=False, daisy_chain=False, |
- read_args_from_stdin=False, print_ver=False, |
- use_manifest=False, preserve_acl=False, |
- canned_acl=None, skip_unsupported_objects=False, |
- test_callback_file=None): |
- """Creates CopyHelperOpts for passing options to CopyHelper.""" |
- # We create a tuple with union of options needed by CopyHelper and any |
- # copy-related functionality in CpCommand, RsyncCommand, or Command class. |
- global global_copy_helper_opts |
- global_copy_helper_opts = CopyHelperOpts( |
- perform_mv=perform_mv, |
- no_clobber=no_clobber, |
- daisy_chain=daisy_chain, |
- read_args_from_stdin=read_args_from_stdin, |
- print_ver=print_ver, |
- use_manifest=use_manifest, |
- preserve_acl=preserve_acl, |
- canned_acl=canned_acl, |
- skip_unsupported_objects=skip_unsupported_objects, |
- test_callback_file=test_callback_file) |
- return global_copy_helper_opts |
- |
- |
-# pylint: disable=global-variable-undefined |
-# pylint: disable=global-variable-not-assigned |
-def GetCopyHelperOpts(): |
- """Returns namedtuple holding CopyHelper options.""" |
- global global_copy_helper_opts |
- return global_copy_helper_opts |
- |
- |
-def _SelectDownloadStrategy(dst_url): |
- """Get download strategy based on the destination object. |
- |
- Args: |
- dst_url: Destination StorageUrl. |
- |
- Returns: |
- gsutil Cloud API DownloadStrategy. |
- """ |
- dst_is_special = False |
- if dst_url.IsFileUrl(): |
- # Check explicitly first because os.stat doesn't work on 'nul' in Windows. |
- if dst_url.object_name == os.devnull: |
- dst_is_special = True |
- try: |
- mode = os.stat(dst_url.object_name).st_mode |
- if stat.S_ISCHR(mode): |
- dst_is_special = True |
- except OSError: |
- pass |
- |
- if dst_is_special: |
- return CloudApi.DownloadStrategy.ONE_SHOT |
- else: |
- return CloudApi.DownloadStrategy.RESUMABLE |
- |
- |
-def _GetUploadTrackerData(tracker_file_name, logger): |
- """Reads tracker data from an upload tracker file if it exists. |
- |
- Args: |
- tracker_file_name: Tracker file name for this upload. |
- logger: for outputting log messages. |
- |
- Returns: |
- Serialization data if the tracker file already exists (resume existing |
- upload), None otherwise. |
- """ |
- tracker_file = None |
- |
- # If we already have a matching tracker file, get the serialization data |
- # so that we can resume the upload. |
- try: |
- tracker_file = open(tracker_file_name, 'r') |
- tracker_data = tracker_file.read() |
- return tracker_data |
- except IOError as e: |
- # Ignore non-existent file (happens first time a upload is attempted on an |
- # object, or when re-starting an upload after a |
- # ResumableUploadStartOverException), but warn user for other errors. |
- if e.errno != errno.ENOENT: |
- logger.warn('Couldn\'t read upload tracker file (%s): %s. Restarting ' |
- 'upload from scratch.', tracker_file_name, e.strerror) |
- finally: |
- if tracker_file: |
- tracker_file.close() |
- |
- |
-def InsistDstUrlNamesContainer(exp_dst_url, have_existing_dst_container, |
- command_name): |
- """Ensures the destination URL names a container. |
- |
- Acceptable containers include directory, bucket, bucket |
- subdir, and non-existent bucket subdir. |
- |
- Args: |
- exp_dst_url: Wildcard-expanded destination StorageUrl. |
- have_existing_dst_container: bool indicator of whether exp_dst_url |
- names a container (directory, bucket, or existing bucket subdir). |
- command_name: Name of command making call. May not be the same as the |
- calling class's self.command_name in the case of commands implemented |
- atop other commands (like mv command). |
- |
- Raises: |
- CommandException: if the URL being checked does not name a container. |
- """ |
- if ((exp_dst_url.IsFileUrl() and not exp_dst_url.IsDirectory()) or |
- (exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket() |
- and not have_existing_dst_container)): |
- raise CommandException('Destination URL must name a directory, bucket, ' |
- 'or bucket\nsubdirectory for the multiple ' |
- 'source form of the %s command.' % command_name) |
- |
- |
-def _ShouldTreatDstUrlAsBucketSubDir(have_multiple_srcs, dst_url, |
- have_existing_dest_subdir, |
- src_url_names_container, |
- recursion_requested): |
- """Checks whether dst_url should be treated as a bucket "sub-directory". |
- |
- The decision about whether something constitutes a bucket "sub-directory" |
- depends on whether there are multiple sources in this request and whether |
- there is an existing bucket subdirectory. For example, when running the |
- command: |
- gsutil cp file gs://bucket/abc |
- if there's no existing gs://bucket/abc bucket subdirectory we should copy |
- file to the object gs://bucket/abc. In contrast, if |
- there's an existing gs://bucket/abc bucket subdirectory we should copy |
- file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc |
- exists, when running the command: |
- gsutil cp file1 file2 gs://bucket/abc |
- we should copy file1 to gs://bucket/abc/file1 (and similarly for file2). |
- Finally, for recursive copies, if the source is a container then we should |
- copy to a container as the target. For example, when running the command: |
- gsutil cp -r dir1 gs://bucket/dir2 |
- we should copy the subtree of dir1 to gs://bucket/dir2. |
- |
- Note that we don't disallow naming a bucket "sub-directory" where there's |
- already an object at that URL. For example it's legitimate (albeit |
- confusing) to have an object called gs://bucket/dir and |
- then run the command |
- gsutil cp file1 file2 gs://bucket/dir |
- Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1, |
- and gs://bucket/dir/file2. |
- |
- Args: |
- have_multiple_srcs: Bool indicator of whether this is a multi-source |
- operation. |
- dst_url: StorageUrl to check. |
- have_existing_dest_subdir: bool indicator whether dest is an existing |
- subdirectory. |
- src_url_names_container: bool indicator of whether the source URL |
- is a container. |
- recursion_requested: True if a recursive operation has been requested. |
- |
- Returns: |
- bool indicator. |
- """ |
- if have_existing_dest_subdir: |
- return True |
- if dst_url.IsCloudUrl(): |
- return (have_multiple_srcs or |
- (src_url_names_container and recursion_requested)) |
- |
- |
-def _ShouldTreatDstUrlAsSingleton(have_multiple_srcs, |
- have_existing_dest_subdir, dst_url, |
- recursion_requested): |
- """Checks that dst_url names a single file/object after wildcard expansion. |
- |
- It is possible that an object path might name a bucket sub-directory. |
- |
- Args: |
- have_multiple_srcs: Bool indicator of whether this is a multi-source |
- operation. |
- have_existing_dest_subdir: bool indicator whether dest is an existing |
- subdirectory. |
- dst_url: StorageUrl to check. |
- recursion_requested: True if a recursive operation has been requested. |
- |
- Returns: |
- bool indicator. |
- """ |
- if recursion_requested: |
- return False |
- if dst_url.IsFileUrl(): |
- return not dst_url.IsDirectory() |
- else: # dst_url.IsCloudUrl() |
- return (not have_multiple_srcs and |
- not have_existing_dest_subdir and |
- dst_url.IsObject()) |
- |
- |
-def ConstructDstUrl(src_url, exp_src_url, src_url_names_container, |
- have_multiple_srcs, exp_dst_url, have_existing_dest_subdir, |
- recursion_requested): |
- """Constructs the destination URL for a given exp_src_url/exp_dst_url pair. |
- |
- Uses context-dependent naming rules that mimic Linux cp and mv behavior. |
- |
- Args: |
- src_url: Source StorageUrl to be copied. |
- exp_src_url: Single StorageUrl from wildcard expansion of src_url. |
- src_url_names_container: True if src_url names a container (including the |
- case of a wildcard-named bucket subdir (like gs://bucket/abc, |
- where gs://bucket/abc/* matched some objects). |
- have_multiple_srcs: True if this is a multi-source request. This can be |
- true if src_url wildcard-expanded to multiple URLs or if there were |
- multiple source URLs in the request. |
- exp_dst_url: the expanded StorageUrl requested for the cp destination. |
- Final written path is constructed from this plus a context-dependent |
- variant of src_url. |
- have_existing_dest_subdir: bool indicator whether dest is an existing |
- subdirectory. |
- recursion_requested: True if a recursive operation has been requested. |
- |
- Returns: |
- StorageUrl to use for copy. |
- |
- Raises: |
- CommandException if destination object name not specified for |
- source and source is a stream. |
- """ |
- if _ShouldTreatDstUrlAsSingleton( |
- have_multiple_srcs, have_existing_dest_subdir, exp_dst_url, |
- recursion_requested): |
- # We're copying one file or object to one file or object. |
- return exp_dst_url |
- |
- if exp_src_url.IsFileUrl() and exp_src_url.IsStream(): |
- if have_existing_dest_subdir: |
- raise CommandException('Destination object name needed when ' |
- 'source is a stream') |
- return exp_dst_url |
- |
- if not recursion_requested and not have_multiple_srcs: |
- # We're copying one file or object to a subdirectory. Append final comp |
- # of exp_src_url to exp_dst_url. |
- src_final_comp = exp_src_url.object_name.rpartition(src_url.delim)[-1] |
- return StorageUrlFromString('%s%s%s' % ( |
- exp_dst_url.url_string.rstrip(exp_dst_url.delim), |
- exp_dst_url.delim, src_final_comp)) |
- |
- # Else we're copying multiple sources to a directory, bucket, or a bucket |
- # "sub-directory". |
- |
- # Ensure exp_dst_url ends in delim char if we're doing a multi-src copy or |
- # a copy to a directory. (The check for copying to a directory needs |
- # special-case handling so that the command: |
- # gsutil cp gs://bucket/obj dir |
- # will turn into file://dir/ instead of file://dir -- the latter would cause |
- # the file "dirobj" to be created.) |
- # Note: need to check have_multiple_srcs or src_url.names_container() |
- # because src_url could be a bucket containing a single object, named |
- # as gs://bucket. |
- if ((have_multiple_srcs or src_url_names_container or |
- (exp_dst_url.IsFileUrl() and exp_dst_url.IsDirectory())) |
- and not exp_dst_url.url_string.endswith(exp_dst_url.delim)): |
- exp_dst_url = StorageUrlFromString('%s%s' % (exp_dst_url.url_string, |
- exp_dst_url.delim)) |
- |
- # Making naming behavior match how things work with local Linux cp and mv |
- # operations depends on many factors, including whether the destination is a |
- # container, the plurality of the source(s), and whether the mv command is |
- # being used: |
- # 1. For the "mv" command that specifies a non-existent destination subdir, |
- # renaming should occur at the level of the src subdir, vs appending that |
- # subdir beneath the dst subdir like is done for copying. For example: |
- # gsutil rm -r gs://bucket |
- # gsutil cp -r dir1 gs://bucket |
- # gsutil cp -r dir2 gs://bucket/subdir1 |
- # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2 |
- # would (if using cp naming behavior) end up with paths like: |
- # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops |
- # whereas mv naming behavior should result in: |
- # gs://bucket/subdir2/dir2/.svn/all-wcprops |
- # 2. Copying from directories, buckets, or bucket subdirs should result in |
- # objects/files mirroring the source directory hierarchy. For example: |
- # gsutil cp dir1/dir2 gs://bucket |
- # should create the object gs://bucket/dir2/file2, assuming dir1/dir2 |
- # contains file2). |
- # To be consistent with Linux cp behavior, there's one more wrinkle when |
- # working with subdirs: The resulting object names depend on whether the |
- # destination subdirectory exists. For example, if gs://bucket/subdir |
- # exists, the command: |
- # gsutil cp -r dir1/dir2 gs://bucket/subdir |
- # should create objects named like gs://bucket/subdir/dir2/a/b/c. In |
- # contrast, if gs://bucket/subdir does not exist, this same command |
- # should create objects named like gs://bucket/subdir/a/b/c. |
- # 3. Copying individual files or objects to dirs, buckets or bucket subdirs |
- # should result in objects/files named by the final source file name |
- # component. Example: |
- # gsutil cp dir1/*.txt gs://bucket |
- # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt, |
- # assuming dir1 contains f1.txt and f2.txt. |
- |
- recursive_move_to_new_subdir = False |
- if (global_copy_helper_opts.perform_mv and recursion_requested |
- and src_url_names_container and not have_existing_dest_subdir): |
- # Case 1. Handle naming rules for bucket subdir mv. Here we want to |
- # line up the src_url against its expansion, to find the base to build |
- # the new name. For example, running the command: |
- # gsutil mv gs://bucket/abcd gs://bucket/xyz |
- # when processing exp_src_url=gs://bucket/abcd/123 |
- # exp_src_url_tail should become /123 |
- # Note: mv.py code disallows wildcard specification of source URL. |
- recursive_move_to_new_subdir = True |
- exp_src_url_tail = ( |
- exp_src_url.url_string[len(src_url.url_string):]) |
- dst_key_name = '%s/%s' % (exp_dst_url.object_name.rstrip('/'), |
- exp_src_url_tail.strip('/')) |
- |
- elif src_url_names_container and (exp_dst_url.IsCloudUrl() or |
- exp_dst_url.IsDirectory()): |
- # Case 2. Container copy to a destination other than a file. |
- # Build dst_key_name from subpath of exp_src_url past |
- # where src_url ends. For example, for src_url=gs://bucket/ and |
- # exp_src_url=gs://bucket/src_subdir/obj, dst_key_name should be |
- # src_subdir/obj. |
- src_url_path_sans_final_dir = GetPathBeforeFinalDir(src_url) |
- dst_key_name = exp_src_url.versionless_url_string[ |
- len(src_url_path_sans_final_dir):].lstrip(src_url.delim) |
- # Handle case where dst_url is a non-existent subdir. |
- if not have_existing_dest_subdir: |
- dst_key_name = dst_key_name.partition(src_url.delim)[-1] |
- # Handle special case where src_url was a directory named with '.' or |
- # './', so that running a command like: |
- # gsutil cp -r . gs://dest |
- # will produce obj names of the form gs://dest/abc instead of |
- # gs://dest/./abc. |
- if dst_key_name.startswith('.%s' % os.sep): |
- dst_key_name = dst_key_name[2:] |
- |
- else: |
- # Case 3. |
- dst_key_name = exp_src_url.object_name.rpartition(src_url.delim)[-1] |
- |
- if (not recursive_move_to_new_subdir and ( |
- exp_dst_url.IsFileUrl() or _ShouldTreatDstUrlAsBucketSubDir( |
- have_multiple_srcs, exp_dst_url, have_existing_dest_subdir, |
- src_url_names_container, recursion_requested))): |
- if exp_dst_url.object_name and exp_dst_url.object_name.endswith( |
- exp_dst_url.delim): |
- dst_key_name = '%s%s%s' % ( |
- exp_dst_url.object_name.rstrip(exp_dst_url.delim), |
- exp_dst_url.delim, dst_key_name) |
- else: |
- delim = exp_dst_url.delim if exp_dst_url.object_name else '' |
- dst_key_name = '%s%s%s' % (exp_dst_url.object_name or '', |
- delim, dst_key_name) |
- |
- new_exp_dst_url = exp_dst_url.Clone() |
- new_exp_dst_url.object_name = dst_key_name.replace(src_url.delim, |
- exp_dst_url.delim) |
- return new_exp_dst_url |
- |
- |
-def _CreateDigestsFromDigesters(digesters): |
- digests = {} |
- if digesters: |
- for alg in digesters: |
- digests[alg] = base64.encodestring( |
- digesters[alg].digest()).rstrip('\n') |
- return digests |
- |
- |
-def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata): |
- """Creates a base64 CRC32C and/or MD5 digest from file_name. |
- |
- Args: |
- logger: for outputting log messages. |
- algs: list of algorithms to compute. |
- file_name: file to digest. |
- src_obj_metadata: metadta of source object. |
- |
- Returns: |
- Dict of algorithm name : base 64 encoded digest |
- """ |
- hash_dict = {} |
- if 'md5' in algs: |
- if src_obj_metadata.size and src_obj_metadata.size > TEN_MIB: |
- logger.info( |
- 'Computing MD5 for %s...', file_name) |
- hash_dict['md5'] = md5() |
- if 'crc32c' in algs: |
- hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c') |
- with open(file_name, 'rb') as fp: |
- CalculateHashesFromContents( |
- fp, hash_dict, ProgressCallbackWithBackoff( |
- src_obj_metadata.size, |
- FileProgressCallbackHandler( |
- ConstructAnnounceText('Hashing', file_name), logger).call)) |
- digests = {} |
- for alg_name, digest in hash_dict.iteritems(): |
- digests[alg_name] = Base64EncodeHash(digest.hexdigest()) |
- return digests |
- |
- |
-def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, |
- dst_obj_metadata): |
- """Validates integrity of two cloud objects copied via daisy-chain. |
- |
- Args: |
- logger: for outputting log messages. |
- src_url: CloudUrl for source cloud object. |
- dst_url: CloudUrl for destination cloud object. |
- src_obj_metadata: Cloud Object metadata for object being downloaded from. |
- dst_obj_metadata: Cloud Object metadata for object being uploaded to. |
- |
- Raises: |
- CommandException: if cloud digests don't match local digests. |
- """ |
- checked_one = False |
- download_hashes = {} |
- upload_hashes = {} |
- if src_obj_metadata.md5Hash: |
- download_hashes['md5'] = src_obj_metadata.md5Hash |
- if src_obj_metadata.crc32c: |
- download_hashes['crc32c'] = src_obj_metadata.crc32c |
- if dst_obj_metadata.md5Hash: |
- upload_hashes['md5'] = dst_obj_metadata.md5Hash |
- if dst_obj_metadata.crc32c: |
- upload_hashes['crc32c'] = dst_obj_metadata.crc32c |
- |
- for alg, upload_b64_digest in upload_hashes.iteritems(): |
- if alg not in download_hashes: |
- continue |
- |
- download_b64_digest = download_hashes[alg] |
- logger.debug( |
- 'Comparing source vs destination %s-checksum for %s. (%s/%s)', alg, |
- dst_url, download_b64_digest, upload_b64_digest) |
- if download_b64_digest != upload_b64_digest: |
- raise HashMismatchException( |
- '%s signature for source object (%s) doesn\'t match ' |
- 'destination object digest (%s). Object (%s) will be deleted.' % ( |
- alg, download_b64_digest, upload_b64_digest, dst_url)) |
- checked_one = True |
- if not checked_one: |
- # One known way this can currently happen is when downloading objects larger |
- # than 5 GiB from S3 (for which the etag is not an MD5). |
- logger.warn( |
- 'WARNING: Found no hashes to validate object downloaded from %s and ' |
- 'uploaded to %s. Integrity cannot be assured without hashes.', |
- src_url, dst_url) |
- |
- |
-def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests, |
- is_upload=False): |
- """Validates integrity by comparing cloud digest to local digest. |
- |
- Args: |
- logger: for outputting log messages. |
- obj_url: CloudUrl for cloud object. |
- obj_metadata: Cloud Object being downloaded from or uploaded to. |
- file_name: Local file name on disk being downloaded to or uploaded from. |
- digests: Computed Digests for the object. |
- is_upload: If true, comparing for an uploaded object (controls logging). |
- |
- Raises: |
- CommandException: if cloud digests don't match local digests. |
- """ |
- local_hashes = digests |
- cloud_hashes = {} |
- if obj_metadata.md5Hash: |
- cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n') |
- if obj_metadata.crc32c: |
- cloud_hashes['crc32c'] = obj_metadata.crc32c.rstrip('\n') |
- |
- checked_one = False |
- for alg in local_hashes: |
- if alg not in cloud_hashes: |
- continue |
- |
- local_b64_digest = local_hashes[alg] |
- cloud_b64_digest = cloud_hashes[alg] |
- logger.debug( |
- 'Comparing local vs cloud %s-checksum for %s. (%s/%s)', alg, file_name, |
- local_b64_digest, cloud_b64_digest) |
- if local_b64_digest != cloud_b64_digest: |
- |
- raise HashMismatchException( |
- '%s signature computed for local file (%s) doesn\'t match ' |
- 'cloud-supplied digest (%s). %s (%s) will be deleted.' % ( |
- alg, local_b64_digest, cloud_b64_digest, |
- 'Cloud object' if is_upload else 'Local file', |
- obj_url if is_upload else file_name)) |
- checked_one = True |
- if not checked_one: |
- if is_upload: |
- logger.warn( |
- 'WARNING: Found no hashes to validate object uploaded to %s. ' |
- 'Integrity cannot be assured without hashes.', obj_url) |
- else: |
- # One known way this can currently happen is when downloading objects larger |
- # than 5 GB from S3 (for which the etag is not an MD5). |
- logger.warn( |
- 'WARNING: Found no hashes to validate object downloaded to %s. ' |
- 'Integrity cannot be assured without hashes.', file_name) |
- |
- |
-def IsNoClobberServerException(e): |
- """Checks to see if the server attempted to clobber a file. |
- |
- In this case we specified via a precondition that we didn't want the file |
- clobbered. |
- |
- Args: |
- e: The Exception that was generated by a failed copy operation |
- |
- Returns: |
- bool indicator - True indicates that the server did attempt to clobber |
- an existing file. |
- """ |
- return ((isinstance(e, PreconditionException)) or |
- (isinstance(e, ResumableUploadException) and '412' in e.message)) |
- |
- |
-def CheckForDirFileConflict(exp_src_url, dst_url): |
- """Checks whether copying exp_src_url into dst_url is not possible. |
- |
- This happens if a directory exists in local file system where a file |
- needs to go or vice versa. In that case we print an error message and |
- exits. Example: if the file "./x" exists and you try to do: |
- gsutil cp gs://mybucket/x/y . |
- the request can't succeed because it requires a directory where |
- the file x exists. |
- |
- Note that we don't enforce any corresponding restrictions for buckets, |
- because the flat namespace semantics for buckets doesn't prohibit such |
- cases the way hierarchical file systems do. For example, if a bucket |
- contains an object called gs://bucket/dir and then you run the command: |
- gsutil cp file1 file2 gs://bucket/dir |
- you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and |
- gs://bucket/dir/file2. |
- |
- Args: |
- exp_src_url: Expanded source StorageUrl. |
- dst_url: Destination StorageUrl. |
- |
- Raises: |
- CommandException: if errors encountered. |
- """ |
- if dst_url.IsCloudUrl(): |
- # The problem can only happen for file destination URLs. |
- return |
- dst_path = dst_url.object_name |
- final_dir = os.path.dirname(dst_path) |
- if os.path.isfile(final_dir): |
- raise CommandException('Cannot retrieve %s because a file exists ' |
- 'where a directory needs to be created (%s).' % |
- (exp_src_url.url_string, final_dir)) |
- if os.path.isdir(dst_path): |
- raise CommandException('Cannot retrieve %s because a directory exists ' |
- '(%s) where the file needs to be created.' % |
- (exp_src_url.url_string, dst_path)) |
- |
- |
-def _PartitionFile(fp, file_size, src_url, content_type, canned_acl, |
- dst_bucket_url, random_prefix, tracker_file, |
- tracker_file_lock): |
- """Partitions a file into FilePart objects to be uploaded and later composed. |
- |
- These objects, when composed, will match the original file. This entails |
- splitting the file into parts, naming and forming a destination URL for each |
- part, and also providing the PerformParallelUploadFileToObjectArgs |
- corresponding to each part. |
- |
- Args: |
- fp: The file object to be partitioned. |
- file_size: The size of fp, in bytes. |
- src_url: Source FileUrl from the original command. |
- content_type: content type for the component and final objects. |
- canned_acl: The user-provided canned_acl, if applicable. |
- dst_bucket_url: CloudUrl for the destination bucket |
- random_prefix: The randomly-generated prefix used to prevent collisions |
- among the temporary component names. |
- tracker_file: The path to the parallel composite upload tracker file. |
- tracker_file_lock: The lock protecting access to the tracker file. |
- |
- Returns: |
- dst_args: The destination URIs for the temporary component objects. |
- """ |
- parallel_composite_upload_component_size = HumanReadableToBytes( |
- config.get('GSUtil', 'parallel_composite_upload_component_size', |
- DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE)) |
- (num_components, component_size) = _GetPartitionInfo( |
- file_size, MAX_COMPOSE_ARITY, parallel_composite_upload_component_size) |
- |
- dst_args = {} # Arguments to create commands and pass to subprocesses. |
- file_names = [] # Used for the 2-step process of forming dst_args. |
- for i in range(num_components): |
- # "Salt" the object name with something a user is very unlikely to have |
- # used in an object name, then hash the extended name to make sure |
- # we don't run into problems with name length. Using a deterministic |
- # naming scheme for the temporary components allows users to take |
- # advantage of resumable uploads for each component. |
- encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode(UTF8) |
- content_md5 = md5() |
- content_md5.update(encoded_name) |
- digest = content_md5.hexdigest() |
- temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE + |
- digest + '_' + str(i)) |
- tmp_dst_url = dst_bucket_url.Clone() |
- tmp_dst_url.object_name = temp_file_name |
- |
- if i < (num_components - 1): |
- # Every component except possibly the last is the same size. |
- file_part_length = component_size |
- else: |
- # The last component just gets all of the remaining bytes. |
- file_part_length = (file_size - ((num_components -1) * component_size)) |
- offset = i * component_size |
- func_args = PerformParallelUploadFileToObjectArgs( |
- fp.name, offset, file_part_length, src_url, tmp_dst_url, canned_acl, |
- content_type, tracker_file, tracker_file_lock) |
- file_names.append(temp_file_name) |
- dst_args[temp_file_name] = func_args |
- |
- return dst_args |
- |
- |
-def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata, |
- canned_acl, file_size, preconditions, gsutil_api, |
- command_obj, copy_exception_handler): |
- """Uploads a local file to a cloud object using parallel composite upload. |
- |
- The file is partitioned into parts, and then the parts are uploaded in |
- parallel, composed to form the original destination object, and deleted. |
- |
- Args: |
- fp: The file object to be uploaded. |
- src_url: FileUrl representing the local file. |
- dst_url: CloudUrl representing the destination file. |
- dst_obj_metadata: apitools Object describing the destination object. |
- canned_acl: The canned acl to apply to the object, if any. |
- file_size: The size of the source file in bytes. |
- preconditions: Cloud API Preconditions for the final object. |
- gsutil_api: gsutil Cloud API instance to use. |
- command_obj: Command object (for calling Apply). |
- copy_exception_handler: Copy exception handler (for use in Apply). |
- |
- Returns: |
- Elapsed upload time, uploaded Object with generation, crc32c, and size |
- fields populated. |
- """ |
- start_time = time.time() |
- dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string) |
- api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme) |
- # Determine which components, if any, have already been successfully |
- # uploaded. |
- tracker_file = GetTrackerFilePath(dst_url, TrackerFileType.PARALLEL_UPLOAD, |
- api_selector, src_url) |
- tracker_file_lock = CreateLock() |
- (random_prefix, existing_components) = ( |
- _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock)) |
- |
- # Create the initial tracker file for the upload. |
- _CreateParallelUploadTrackerFile(tracker_file, random_prefix, |
- existing_components, tracker_file_lock) |
- |
- # Get the set of all components that should be uploaded. |
- dst_args = _PartitionFile( |
- fp, file_size, src_url, dst_obj_metadata.contentType, canned_acl, |
- dst_bucket_url, random_prefix, tracker_file, tracker_file_lock) |
- |
- (components_to_upload, existing_components, existing_objects_to_delete) = ( |
- FilterExistingComponents(dst_args, existing_components, dst_bucket_url, |
- gsutil_api)) |
- |
- # In parallel, copy all of the file parts that haven't already been |
- # uploaded to temporary objects. |
- cp_results = command_obj.Apply( |
- _PerformParallelUploadFileToObject, components_to_upload, |
- copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'), |
- arg_checker=gslib.command.DummyArgChecker, |
- parallel_operations_override=True, should_return_results=True) |
- uploaded_components = [] |
- for cp_result in cp_results: |
- uploaded_components.append(cp_result[2]) |
- components = uploaded_components + existing_components |
- |
- if len(components) == len(dst_args): |
- # Only try to compose if all of the components were uploaded successfully. |
- |
- def _GetComponentNumber(component): |
- return int(component.object_name[component.object_name.rfind('_')+1:]) |
- # Sort the components so that they will be composed in the correct order. |
- components = sorted(components, key=_GetComponentNumber) |
- |
- request_components = [] |
- for component_url in components: |
- src_obj_metadata = ( |
- apitools_messages.ComposeRequest.SourceObjectsValueListEntry( |
- name=component_url.object_name)) |
- if component_url.HasGeneration(): |
- src_obj_metadata.generation = long(component_url.generation) |
- request_components.append(src_obj_metadata) |
- |
- composed_object = gsutil_api.ComposeObject( |
- request_components, dst_obj_metadata, preconditions=preconditions, |
- provider=dst_url.scheme, fields=['generation', 'crc32c', 'size']) |
- |
- try: |
- # Make sure only to delete things that we know were successfully |
- # uploaded (as opposed to all of the objects that we attempted to |
- # create) so that we don't delete any preexisting objects, except for |
- # those that were uploaded by a previous, failed run and have since |
- # changed (but still have an old generation lying around). |
- objects_to_delete = components + existing_objects_to_delete |
- command_obj.Apply(_DeleteObjectFn, objects_to_delete, _RmExceptionHandler, |
- arg_checker=gslib.command.DummyArgChecker, |
- parallel_operations_override=True) |
- except Exception: # pylint: disable=broad-except |
- # If some of the delete calls fail, don't cause the whole command to |
- # fail. The copy was successful iff the compose call succeeded, so |
- # reduce this to a warning. |
- logging.warning( |
- 'Failed to delete some of the following temporary objects:\n' + |
- '\n'.join(dst_args.keys())) |
- finally: |
- with tracker_file_lock: |
- if os.path.exists(tracker_file): |
- os.unlink(tracker_file) |
- else: |
- # Some of the components failed to upload. In this case, we want to exit |
- # without deleting the objects. |
- raise CommandException( |
- 'Some temporary components were not uploaded successfully. ' |
- 'Please retry this upload.') |
- |
- elapsed_time = time.time() - start_time |
- return elapsed_time, composed_object |
- |
- |
-def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url, |
- file_size, canned_acl=None): |
- """Determines whether parallel composite upload strategy should be used. |
- |
- Args: |
- logger: for outputting log messages. |
- allow_splitting: If false, then this function returns false. |
- src_url: FileUrl corresponding to a local file. |
- dst_url: CloudUrl corresponding to destination cloud object. |
- file_size: The size of the source file, in bytes. |
- canned_acl: Canned ACL to apply to destination object, if any. |
- |
- Returns: |
- True iff a parallel upload should be performed on the source file. |
- """ |
- global suggested_parallel_composites |
- parallel_composite_upload_threshold = HumanReadableToBytes(config.get( |
- 'GSUtil', 'parallel_composite_upload_threshold', |
- DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD)) |
- |
- all_factors_but_size = ( |
- allow_splitting # Don't split the pieces multiple times. |
- and not src_url.IsStream() # We can't partition streams. |
- and dst_url.scheme == 'gs' # Compose is only for gs. |
- and not canned_acl) # TODO: Implement canned ACL support for compose. |
- |
- # Since parallel composite uploads are disabled by default, make user aware of |
- # them. |
- # TODO: Once compiled crcmod is being distributed by major Linux distributions |
- # remove this check. |
- if (all_factors_but_size and parallel_composite_upload_threshold == 0 |
- and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD |
- and not suggested_parallel_composites): |
- logger.info('\n'.join(textwrap.wrap( |
- '==> NOTE: You are uploading one or more large file(s), which would ' |
- 'run significantly faster if you enable parallel composite uploads. ' |
- 'This feature can be enabled by editing the ' |
- '"parallel_composite_upload_threshold" value in your .boto ' |
- 'configuration file. However, note that if you do this you and any ' |
- 'users that download such composite files will need to have a compiled ' |
- 'crcmod installed (see "gsutil help crcmod").')) + '\n') |
- suggested_parallel_composites = True |
- |
- return (all_factors_but_size |
- and parallel_composite_upload_threshold > 0 |
- and file_size >= parallel_composite_upload_threshold) |
- |
- |
-def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id, |
- treat_nonexistent_object_as_subdir=False): |
- """Expands wildcard if present in url_str. |
- |
- Args: |
- url_str: String representation of requested url. |
- gsutil_api: gsutil Cloud API instance to use. |
- debug: debug level to use (for iterators). |
- project_id: project ID to use (for iterators). |
- treat_nonexistent_object_as_subdir: indicates if should treat a non-existent |
- object as a subdir. |
- |
- Returns: |
- (exp_url, have_existing_dst_container) |
- where exp_url is a StorageUrl |
- and have_existing_dst_container is a bool indicating whether |
- exp_url names an existing directory, bucket, or bucket subdirectory. |
- In the case where we match a subdirectory AND an object, the |
- object is returned. |
- |
- Raises: |
- CommandException: if url_str matched more than 1 URL. |
- """ |
- # Handle wildcarded url case. |
- if ContainsWildcard(url_str): |
- blr_expansion = list(CreateWildcardIterator(url_str, gsutil_api, |
- debug=debug, |
- project_id=project_id)) |
- if len(blr_expansion) != 1: |
- raise CommandException('Destination (%s) must match exactly 1 URL' % |
- url_str) |
- blr = blr_expansion[0] |
- # BLR is either an OBJECT, PREFIX, or BUCKET; the latter two represent |
- # directories. |
- return (StorageUrlFromString(blr.url_string), not blr.IsObject()) |
- |
- storage_url = StorageUrlFromString(url_str) |
- |
- # Handle non-wildcarded URL. |
- if storage_url.IsFileUrl(): |
- return (storage_url, storage_url.IsDirectory()) |
- |
- # At this point we have a cloud URL. |
- if storage_url.IsBucket(): |
- return (storage_url, True) |
- |
- # For object/prefix URLs check 3 cases: (a) if the name ends with '/' treat |
- # as a subdir; otherwise, use the wildcard iterator with url to |
- # find if (b) there's a Prefix matching url, or (c) name is of form |
- # dir_$folder$ (and in both these cases also treat dir as a subdir). |
- # Cloud subdirs are always considered to be an existing container. |
- if IsCloudSubdirPlaceholder(storage_url): |
- return (storage_url, True) |
- |
- # Check for the special case where we have a folder marker object. |
- folder_expansion = CreateWildcardIterator( |
- storage_url.versionless_url_string + '_$folder$', gsutil_api, |
- debug=debug, project_id=project_id).IterAll( |
- bucket_listing_fields=['name']) |
- for blr in folder_expansion: |
- return (storage_url, True) |
- |
- blr_expansion = CreateWildcardIterator(url_str, gsutil_api, |
- debug=debug, |
- project_id=project_id).IterAll( |
- bucket_listing_fields=['name']) |
- expansion_empty = True |
- for blr in blr_expansion: |
- expansion_empty = False |
- if blr.IsPrefix(): |
- return (storage_url, True) |
- |
- return (storage_url, |
- expansion_empty and treat_nonexistent_object_as_subdir) |
- |
- |
-def FixWindowsNaming(src_url, dst_url): |
- """Translates Windows pathnames to cloud pathnames. |
- |
- Rewrites the destination URL built by ConstructDstUrl(). |
- |
- Args: |
- src_url: Source StorageUrl to be copied. |
- dst_url: The destination StorageUrl built by ConstructDstUrl(). |
- |
- Returns: |
- StorageUrl to use for copy. |
- """ |
- if (src_url.IsFileUrl() and src_url.delim == '\\' |
- and dst_url.IsCloudUrl()): |
- trans_url_str = re.sub(r'\\', '/', dst_url.url_string) |
- dst_url = StorageUrlFromString(trans_url_str) |
- return dst_url |
- |
- |
-def SrcDstSame(src_url, dst_url): |
- """Checks if src_url and dst_url represent the same object or file. |
- |
- We don't handle anything about hard or symbolic links. |
- |
- Args: |
- src_url: Source StorageUrl. |
- dst_url: Destination StorageUrl. |
- |
- Returns: |
- Bool indicator. |
- """ |
- if src_url.IsFileUrl() and dst_url.IsFileUrl(): |
- # Translate a/b/./c to a/b/c, so src=dst comparison below works. |
- new_src_path = os.path.normpath(src_url.object_name) |
- new_dst_path = os.path.normpath(dst_url.object_name) |
- return new_src_path == new_dst_path |
- else: |
- return (src_url.url_string == dst_url.url_string and |
- src_url.generation == dst_url.generation) |
- |
- |
-def _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata): |
- """Logs copy operation, including Content-Type if appropriate. |
- |
- Args: |
- logger: logger instance to use for output. |
- src_url: Source StorageUrl. |
- dst_url: Destination StorageUrl. |
- dst_obj_metadata: Object-specific metadata that should be overidden during |
- the copy. |
- """ |
- if (dst_url.IsCloudUrl() and dst_obj_metadata and |
- dst_obj_metadata.contentType): |
- content_type_msg = ' [Content-Type=%s]' % dst_obj_metadata.contentType |
- else: |
- content_type_msg = '' |
- if src_url.IsFileUrl() and src_url.IsStream(): |
- logger.info('Copying from <STDIN>%s...', content_type_msg) |
- else: |
- logger.info('Copying %s%s...', src_url.url_string, content_type_msg) |
- |
- |
-# pylint: disable=undefined-variable |
-def _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, |
- dst_obj_metadata, preconditions, gsutil_api, |
- logger): |
- """Performs copy-in-the cloud from specified src to dest object. |
- |
- Args: |
- src_url: Source CloudUrl. |
- src_obj_metadata: Metadata for source object; must include etag and size. |
- dst_url: Destination CloudUrl. |
- dst_obj_metadata: Object-specific metadata that should be overidden during |
- the copy. |
- preconditions: Preconditions to use for the copy. |
- gsutil_api: gsutil Cloud API instance to use for the copy. |
- logger: logging.Logger for log message output. |
- |
- Returns: |
- (elapsed_time, bytes_transferred, dst_url with generation, |
- md5 hash of destination) excluding overhead like initial GET. |
- |
- Raises: |
- CommandException: if errors encountered. |
- """ |
- start_time = time.time() |
- |
- progress_callback = FileProgressCallbackHandler( |
- ConstructAnnounceText('Copying', dst_url.url_string), logger).call |
- if global_copy_helper_opts.test_callback_file: |
- with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: |
- progress_callback = pickle.loads(test_fp.read()).call |
- dst_obj = gsutil_api.CopyObject( |
- src_obj_metadata, dst_obj_metadata, src_generation=src_url.generation, |
- canned_acl=global_copy_helper_opts.canned_acl, |
- preconditions=preconditions, progress_callback=progress_callback, |
- provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS) |
- |
- end_time = time.time() |
- |
- result_url = dst_url.Clone() |
- result_url.generation = GenerationFromUrlAndString(result_url, |
- dst_obj.generation) |
- |
- return (end_time - start_time, src_obj_metadata.size, result_url, |
- dst_obj.md5Hash) |
- |
- |
-def _CheckFreeSpace(path): |
- """Return path/drive free space (in bytes).""" |
- if IS_WINDOWS: |
- # pylint: disable=g-import-not-at-top |
- try: |
- # pylint: disable=invalid-name |
- get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p, |
- POINTER(c_uint64), |
- POINTER(c_uint64), |
- POINTER(c_uint64)) |
- get_disk_free_space_ex = get_disk_free_space_ex( |
- ('GetDiskFreeSpaceExW', windll.kernel32), ( |
- (1, 'lpszPathName'), |
- (2, 'lpFreeUserSpace'), |
- (2, 'lpTotalSpace'), |
- (2, 'lpFreeSpace'),)) |
- except AttributeError: |
- get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p, |
- POINTER(c_uint64), |
- POINTER(c_uint64), |
- POINTER(c_uint64)) |
- get_disk_free_space_ex = get_disk_free_space_ex( |
- ('GetDiskFreeSpaceExA', windll.kernel32), ( |
- (1, 'lpszPathName'), |
- (2, 'lpFreeUserSpace'), |
- (2, 'lpTotalSpace'), |
- (2, 'lpFreeSpace'),)) |
- |
- def GetDiskFreeSpaceExErrCheck(result, unused_func, args): |
- if not result: |
- raise WinError() |
- return args[1].value |
- get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck |
- |
- return get_disk_free_space_ex(os.getenv('SystemDrive')) |
- else: |
- (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path) |
- return f_frsize * f_bavail |
- |
- |
-def _SetContentTypeFromFile(src_url, dst_obj_metadata): |
- """Detects and sets Content-Type if src_url names a local file. |
- |
- Args: |
- src_url: Source StorageUrl. |
- dst_obj_metadata: Object-specific metadata that should be overidden during |
- the copy. |
- """ |
- # contentType == '' if user requested default type. |
- if (dst_obj_metadata.contentType is None and src_url.IsFileUrl() |
- and not src_url.IsStream()): |
- # Only do content type recognition if src_url is a file. Object-to-object |
- # copies with no -h Content-Type specified re-use the content type of the |
- # source object. |
- object_name = src_url.object_name |
- content_type = None |
- # Streams (denoted by '-') are expected to be 'application/octet-stream' |
- # and 'file' would partially consume them. |
- if object_name != '-': |
- if config.getbool('GSUtil', 'use_magicfile', False): |
- p = subprocess.Popen(['file', '--mime-type', object_name], |
- stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
- output, error = p.communicate() |
- p.stdout.close() |
- p.stderr.close() |
- if p.returncode != 0 or error: |
- raise CommandException( |
- 'Encountered error running "file --mime-type %s" ' |
- '(returncode=%d).\n%s' % (object_name, p.returncode, error)) |
- # Parse output by removing line delimiter and splitting on last ": |
- content_type = output.rstrip().rpartition(': ')[2] |
- else: |
- content_type = mimetypes.guess_type(object_name)[0] |
- if not content_type: |
- content_type = DEFAULT_CONTENT_TYPE |
- dst_obj_metadata.contentType = content_type |
- |
- |
-# pylint: disable=undefined-variable |
-def _UploadFileToObjectNonResumable(src_url, src_obj_filestream, |
- src_obj_size, dst_url, dst_obj_metadata, |
- preconditions, gsutil_api, logger): |
- """Uploads the file using a non-resumable strategy. |
- |
- Args: |
- src_url: Source StorageUrl to upload. |
- src_obj_filestream: File pointer to uploadable bytes. |
- src_obj_size: Size of the source object. |
- dst_url: Destination StorageUrl for the upload. |
- dst_obj_metadata: Metadata for the target object. |
- preconditions: Preconditions for the upload, if any. |
- gsutil_api: gsutil Cloud API instance to use for the upload. |
- logger: For outputting log messages. |
- |
- Returns: |
- Elapsed upload time, uploaded Object with generation, md5, and size fields |
- populated. |
- """ |
- progress_callback = FileProgressCallbackHandler( |
- ConstructAnnounceText('Uploading', dst_url.url_string), logger).call |
- if global_copy_helper_opts.test_callback_file: |
- with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: |
- progress_callback = pickle.loads(test_fp.read()).call |
- start_time = time.time() |
- |
- if src_url.IsStream(): |
- # TODO: gsutil-beta: Provide progress callbacks for streaming uploads. |
- uploaded_object = gsutil_api.UploadObjectStreaming( |
- src_obj_filestream, object_metadata=dst_obj_metadata, |
- canned_acl=global_copy_helper_opts.canned_acl, |
- preconditions=preconditions, progress_callback=progress_callback, |
- provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS) |
- else: |
- uploaded_object = gsutil_api.UploadObject( |
- src_obj_filestream, object_metadata=dst_obj_metadata, |
- canned_acl=global_copy_helper_opts.canned_acl, size=src_obj_size, |
- preconditions=preconditions, progress_callback=progress_callback, |
- provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS) |
- end_time = time.time() |
- elapsed_time = end_time - start_time |
- |
- return elapsed_time, uploaded_object |
- |
- |
-# pylint: disable=undefined-variable |
-def _UploadFileToObjectResumable(src_url, src_obj_filestream, |
- src_obj_size, dst_url, dst_obj_metadata, |
- preconditions, gsutil_api, logger): |
- """Uploads the file using a resumable strategy. |
- |
- Args: |
- src_url: Source FileUrl to upload. Must not be a stream. |
- src_obj_filestream: File pointer to uploadable bytes. |
- src_obj_size: Size of the source object. |
- dst_url: Destination StorageUrl for the upload. |
- dst_obj_metadata: Metadata for the target object. |
- preconditions: Preconditions for the upload, if any. |
- gsutil_api: gsutil Cloud API instance to use for the upload. |
- logger: for outputting log messages. |
- |
- Returns: |
- Elapsed upload time, uploaded Object with generation, md5, and size fields |
- populated. |
- """ |
- tracker_file_name = GetTrackerFilePath( |
- dst_url, TrackerFileType.UPLOAD, |
- gsutil_api.GetApiSelector(provider=dst_url.scheme)) |
- |
- def _UploadTrackerCallback(serialization_data): |
- """Creates a new tracker file for starting an upload from scratch. |
- |
- This function is called by the gsutil Cloud API implementation and the |
- the serialization data is implementation-specific. |
- |
- Args: |
- serialization_data: Serialization data used in resuming the upload. |
- """ |
- tracker_file = None |
- try: |
- tracker_file = open(tracker_file_name, 'w') |
- tracker_file.write(str(serialization_data)) |
- except IOError as e: |
- RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror) |
- finally: |
- if tracker_file: |
- tracker_file.close() |
- |
- # This contains the upload URL, which will uniquely identify the |
- # destination object. |
- tracker_data = _GetUploadTrackerData(tracker_file_name, logger) |
- if tracker_data: |
- logger.info( |
- 'Resuming upload for %s', src_url.url_string) |
- |
- retryable = True |
- |
- progress_callback = FileProgressCallbackHandler( |
- ConstructAnnounceText('Uploading', dst_url.url_string), logger).call |
- if global_copy_helper_opts.test_callback_file: |
- with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: |
- progress_callback = pickle.loads(test_fp.read()).call |
- |
- start_time = time.time() |
- num_startover_attempts = 0 |
- # This loop causes us to retry when the resumable upload failed in a way that |
- # requires starting over with a new upload ID. Retries within a single upload |
- # ID within the current process are handled in |
- # gsutil_api.UploadObjectResumable, and retries within a single upload ID |
- # spanning processes happens if an exception occurs not caught below (which |
- # will leave the tracker file in place, and cause the upload ID to be reused |
- # the next time the user runs gsutil and attempts the same upload). |
- while retryable: |
- try: |
- uploaded_object = gsutil_api.UploadObjectResumable( |
- src_obj_filestream, object_metadata=dst_obj_metadata, |
- canned_acl=global_copy_helper_opts.canned_acl, |
- preconditions=preconditions, provider=dst_url.scheme, |
- size=src_obj_size, serialization_data=tracker_data, |
- fields=UPLOAD_RETURN_FIELDS, |
- tracker_callback=_UploadTrackerCallback, |
- progress_callback=progress_callback) |
- retryable = False |
- except ResumableUploadStartOverException, e: |
- # This can happen, for example, if the server sends a 410 response code. |
- # In that case the current resumable upload ID can't be reused, so delete |
- # the tracker file and try again up to max retries. |
- num_startover_attempts += 1 |
- retryable = (num_startover_attempts < GetNumRetries()) |
- if not retryable: |
- raise |
- |
- # If the server sends a 404 response code, then the upload should only |
- # be restarted if it was the object (and not the bucket) that was missing. |
- try: |
- gsutil_api.GetBucket(dst_obj_metadata.bucket, provider=dst_url.scheme) |
- except NotFoundException: |
- raise |
- |
- logger.info('Restarting upload from scratch after exception %s', e) |
- DeleteTrackerFile(tracker_file_name) |
- tracker_data = None |
- src_obj_filestream.seek(0) |
- # Reset the progress callback handler. |
- progress_callback = FileProgressCallbackHandler( |
- ConstructAnnounceText('Uploading', dst_url.url_string), logger).call |
- logger.info('\n'.join(textwrap.wrap( |
- 'Resumable upload of %s failed with a response code indicating we ' |
- 'need to start over with a new resumable upload ID. Backing off ' |
- 'and retrying.' % src_url.url_string))) |
- time.sleep(min(random.random() * (2 ** num_startover_attempts), |
- GetMaxRetryDelay())) |
- except ResumableUploadAbortException: |
- retryable = False |
- raise |
- finally: |
- if not retryable: |
- DeleteTrackerFile(tracker_file_name) |
- |
- end_time = time.time() |
- elapsed_time = end_time - start_time |
- |
- return (elapsed_time, uploaded_object) |
- |
- |
-def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger): |
- """Compresses a to-be-uploaded local file to save bandwidth. |
- |
- Args: |
- src_url: Source FileUrl. |
- src_obj_filestream: Read stream of the source file - will be consumed |
- and closed. |
- src_obj_size: Size of the source file. |
- logger: for outputting log messages. |
- |
- Returns: |
- StorageUrl path to compressed file, compressed file size. |
- """ |
- # TODO: Compress using a streaming model as opposed to all at once here. |
- if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING: |
- logger.info( |
- 'Compressing %s (to tmp)...', src_url) |
- (gzip_fh, gzip_path) = tempfile.mkstemp() |
- gzip_fp = None |
- try: |
- # Check for temp space. Assume the compressed object is at most 2x |
- # the size of the object (normally should compress to smaller than |
- # the object) |
- if _CheckFreeSpace(gzip_path) < 2*int(src_obj_size): |
- raise CommandException('Inadequate temp space available to compress ' |
- '%s. See the CHANGING TEMP DIRECTORIES section ' |
- 'of "gsutil help cp" for more info.' % src_url) |
- gzip_fp = gzip.open(gzip_path, 'wb') |
- data = src_obj_filestream.read(GZIP_CHUNK_SIZE) |
- while data: |
- gzip_fp.write(data) |
- data = src_obj_filestream.read(GZIP_CHUNK_SIZE) |
- finally: |
- if gzip_fp: |
- gzip_fp.close() |
- os.close(gzip_fh) |
- src_obj_filestream.close() |
- gzip_size = os.path.getsize(gzip_path) |
- return StorageUrlFromString(gzip_path), gzip_size |
- |
- |
-def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size, |
- dst_url, dst_obj_metadata, preconditions, gsutil_api, |
- logger, command_obj, copy_exception_handler, |
- gzip_exts=None, allow_splitting=True): |
- """Uploads a local file to an object. |
- |
- Args: |
- src_url: Source FileUrl. |
- src_obj_filestream: Read stream of the source file to be read and closed. |
- src_obj_size: Size of the source file. |
- dst_url: Destination CloudUrl. |
- dst_obj_metadata: Metadata to be applied to the destination object. |
- preconditions: Preconditions to use for the copy. |
- gsutil_api: gsutil Cloud API to use for the copy. |
- logger: for outputting log messages. |
- command_obj: command object for use in Apply in parallel composite uploads. |
- copy_exception_handler: For handling copy exceptions during Apply. |
- gzip_exts: List of file extensions to gzip prior to upload, if any. |
- allow_splitting: Whether to allow the file to be split into component |
- pieces for an parallel composite upload. |
- |
- Returns: |
- (elapsed_time, bytes_transferred, dst_url with generation, |
- md5 hash of destination) excluding overhead like initial GET. |
- |
- Raises: |
- CommandException: if errors encountered. |
- """ |
- if not dst_obj_metadata or not dst_obj_metadata.contentLanguage: |
- content_language = config.get_value('GSUtil', 'content_language') |
- if content_language: |
- dst_obj_metadata.contentLanguage = content_language |
- |
- fname_parts = src_url.object_name.split('.') |
- upload_url = src_url |
- upload_stream = src_obj_filestream |
- upload_size = src_obj_size |
- zipped_file = False |
- if gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts: |
- upload_url, upload_size = _CompressFileForUpload( |
- src_url, src_obj_filestream, src_obj_size, logger) |
- upload_stream = open(upload_url.object_name, 'rb') |
- dst_obj_metadata.contentEncoding = 'gzip' |
- zipped_file = True |
- |
- elapsed_time = None |
- uploaded_object = None |
- hash_algs = GetUploadHashAlgs() |
- digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) |
- |
- parallel_composite_upload = _ShouldDoParallelCompositeUpload( |
- logger, allow_splitting, upload_url, dst_url, src_obj_size, |
- canned_acl=global_copy_helper_opts.canned_acl) |
- |
- if (src_url.IsStream() and |
- gsutil_api.GetApiSelector(provider=dst_url.scheme) == ApiSelector.JSON): |
- orig_stream = upload_stream |
- # Add limited seekable properties to the stream via buffering. |
- upload_stream = ResumableStreamingJsonUploadWrapper( |
- orig_stream, GetJsonResumableChunkSize()) |
- |
- if not parallel_composite_upload and len(hash_algs): |
- # Parallel composite uploads calculate hashes per-component in subsequent |
- # calls to this function, but the composition of the final object is a |
- # cloud-only operation. |
- wrapped_filestream = HashingFileUploadWrapper(upload_stream, digesters, |
- hash_algs, upload_url, logger) |
- else: |
- wrapped_filestream = upload_stream |
- |
- try: |
- if parallel_composite_upload: |
- elapsed_time, uploaded_object = _DoParallelCompositeUpload( |
- upload_stream, upload_url, dst_url, dst_obj_metadata, |
- global_copy_helper_opts.canned_acl, upload_size, preconditions, |
- gsutil_api, command_obj, copy_exception_handler) |
- elif upload_size < ResumableThreshold() or src_url.IsStream(): |
- elapsed_time, uploaded_object = _UploadFileToObjectNonResumable( |
- upload_url, wrapped_filestream, upload_size, dst_url, |
- dst_obj_metadata, preconditions, gsutil_api, logger) |
- else: |
- elapsed_time, uploaded_object = _UploadFileToObjectResumable( |
- upload_url, wrapped_filestream, upload_size, dst_url, |
- dst_obj_metadata, preconditions, gsutil_api, logger) |
- |
- finally: |
- if zipped_file: |
- try: |
- os.unlink(upload_url.object_name) |
- # Windows sometimes complains the temp file is locked when you try to |
- # delete it. |
- except Exception: # pylint: disable=broad-except |
- logger.warning( |
- 'Could not delete %s. This can occur in Windows because the ' |
- 'temporary file is still locked.', upload_url.object_name) |
- # In the gzip case, this is the gzip stream. _CompressFileForUpload will |
- # have already closed the original source stream. |
- upload_stream.close() |
- |
- if not parallel_composite_upload: |
- try: |
- digests = _CreateDigestsFromDigesters(digesters) |
- _CheckHashes(logger, dst_url, uploaded_object, src_url.object_name, |
- digests, is_upload=True) |
- except HashMismatchException: |
- if _RENAME_ON_HASH_MISMATCH: |
- corrupted_obj_metadata = apitools_messages.Object( |
- name=dst_obj_metadata.name, |
- bucket=dst_obj_metadata.bucket, |
- etag=uploaded_object.etag) |
- dst_obj_metadata.name = (dst_url.object_name + |
- _RENAME_ON_HASH_MISMATCH_SUFFIX) |
- gsutil_api.CopyObject(corrupted_obj_metadata, |
- dst_obj_metadata, provider=dst_url.scheme) |
- # If the digest doesn't match, delete the object. |
- gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name, |
- generation=uploaded_object.generation, |
- provider=dst_url.scheme) |
- raise |
- |
- result_url = dst_url.Clone() |
- |
- result_url.generation = uploaded_object.generation |
- result_url.generation = GenerationFromUrlAndString( |
- result_url, uploaded_object.generation) |
- |
- return (elapsed_time, uploaded_object.size, result_url, |
- uploaded_object.md5Hash) |
- |
- |
-# TODO: Refactor this long function into smaller pieces. |
-# pylint: disable=too-many-statements |
-def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
- gsutil_api, logger, test_method=None): |
- """Downloads an object to a local file. |
- |
- Args: |
- src_url: Source CloudUrl. |
- src_obj_metadata: Metadata from the source object. |
- dst_url: Destination FileUrl. |
- gsutil_api: gsutil Cloud API instance to use for the download. |
- logger: for outputting log messages. |
- test_method: Optional test method for modifying the file before validation |
- during unit tests. |
- Returns: |
- (elapsed_time, bytes_transferred, dst_url, md5), excluding overhead like |
- initial GET. |
- |
- Raises: |
- CommandException: if errors encountered. |
- """ |
- global open_files_map |
- file_name = dst_url.object_name |
- dir_name = os.path.dirname(file_name) |
- if dir_name and not os.path.exists(dir_name): |
- # Do dir creation in try block so can ignore case where dir already |
- # exists. This is needed to avoid a race condition when running gsutil |
- # -m cp. |
- try: |
- os.makedirs(dir_name) |
- except OSError, e: |
- if e.errno != errno.EEXIST: |
- raise |
- api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) |
- # For gzipped objects download to a temp file and unzip. For the XML API, |
- # the represents the result of a HEAD request. For the JSON API, this is |
- # the stored encoding which the service may not respect. However, if the |
- # server sends decompressed bytes for a file that is stored compressed |
- # (double compressed case), there is no way we can validate the hash and |
- # we will fail our hash check for the object. |
- if (src_obj_metadata.contentEncoding and |
- src_obj_metadata.contentEncoding.lower().endswith('gzip')): |
- # We can't use tempfile.mkstemp() here because we need a predictable |
- # filename for resumable downloads. |
- download_file_name = _GetDownloadZipFileName(file_name) |
- logger.info( |
- 'Downloading to temp gzip filename %s', download_file_name) |
- need_to_unzip = True |
- else: |
- download_file_name = file_name |
- need_to_unzip = False |
- |
- if download_file_name.endswith(dst_url.delim): |
- logger.warn('\n'.join(textwrap.wrap( |
- 'Skipping attempt to download to filename ending with slash (%s). This ' |
- 'typically happens when using gsutil to download from a subdirectory ' |
- 'created by the Cloud Console (https://cloud.google.com/console)' |
- % download_file_name))) |
- return (0, 0, dst_url, '') |
- |
- # Set up hash digesters. |
- hash_algs = GetDownloadHashAlgs( |
- logger, src_has_md5=src_obj_metadata.md5Hash, |
- src_has_crc32c=src_obj_metadata.crc32c) |
- digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) |
- |
- fp = None |
- # Tracks whether the server used a gzip encoding. |
- server_encoding = None |
- download_complete = False |
- download_strategy = _SelectDownloadStrategy(dst_url) |
- download_start_point = 0 |
- # This is used for resuming downloads, but also for passing the mediaLink |
- # and size into the download for new downloads so that we can avoid |
- # making an extra HTTP call. |
- serialization_data = None |
- serialization_dict = GetDownloadSerializationDict(src_obj_metadata) |
- open_files = [] |
- try: |
- if download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: |
- fp = open(download_file_name, 'wb') |
- elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE: |
- # If this is a resumable download, we need to open the file for append and |
- # manage a tracker file. |
- if open_files_map.get(download_file_name, False): |
- # Ensure another process/thread is not already writing to this file. |
- raise FileConcurrencySkipError |
- open_files.append(download_file_name) |
- open_files_map[download_file_name] = True |
- fp = open(download_file_name, 'ab') |
- |
- resuming = ReadOrCreateDownloadTrackerFile( |
- src_obj_metadata, dst_url, api_selector) |
- if resuming: |
- # Find out how far along we are so we can request the appropriate |
- # remaining range of the object. |
- existing_file_size = GetFileSize(fp, position_to_eof=True) |
- if existing_file_size > src_obj_metadata.size: |
- DeleteTrackerFile(GetTrackerFilePath( |
- dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
- raise CommandException( |
- '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' |
- 'if you re-try this download it will start from scratch' % |
- (download_file_name, existing_file_size, src_url.object_name, |
- src_obj_metadata.size)) |
- else: |
- if existing_file_size == src_obj_metadata.size: |
- logger.info( |
- 'Download already complete for file %s, skipping download but ' |
- 'will run integrity checks.', download_file_name) |
- download_complete = True |
- else: |
- download_start_point = existing_file_size |
- serialization_dict['progress'] = download_start_point |
- logger.info('Resuming download for %s', src_url.url_string) |
- # Catch up our digester with the hash data. |
- if existing_file_size > TEN_MIB: |
- for alg_name in digesters: |
- logger.info( |
- 'Catching up %s for %s', alg_name, download_file_name) |
- with open(download_file_name, 'rb') as hash_fp: |
- while True: |
- data = hash_fp.read(DEFAULT_FILE_BUFFER_SIZE) |
- if not data: |
- break |
- for alg_name in digesters: |
- digesters[alg_name].update(data) |
- else: |
- # Starting a new download, blow away whatever is already there. |
- fp.truncate(0) |
- fp.seek(0) |
- |
- else: |
- raise CommandException('Invalid download strategy %s chosen for' |
- 'file %s' % (download_strategy, fp.name)) |
- |
- if not dst_url.IsStream(): |
- serialization_data = json.dumps(serialization_dict) |
- |
- progress_callback = FileProgressCallbackHandler( |
- ConstructAnnounceText('Downloading', dst_url.url_string), |
- logger).call |
- if global_copy_helper_opts.test_callback_file: |
- with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: |
- progress_callback = pickle.loads(test_fp.read()).call |
- |
- start_time = time.time() |
- # TODO: With gzip encoding (which may occur on-the-fly and not be part of |
- # the object's metadata), when we request a range to resume, it's possible |
- # that the server will just resend the entire object, which means our |
- # caught-up hash will be incorrect. We recalculate the hash on |
- # the local file in the case of a failed gzip hash anyway, but it would |
- # be better if we actively detected this case. |
- if not download_complete: |
- server_encoding = gsutil_api.GetObjectMedia( |
- src_url.bucket_name, src_url.object_name, fp, |
- start_byte=download_start_point, generation=src_url.generation, |
- object_size=src_obj_metadata.size, |
- download_strategy=download_strategy, provider=src_url.scheme, |
- serialization_data=serialization_data, digesters=digesters, |
- progress_callback=progress_callback) |
- |
- end_time = time.time() |
- |
- # If a custom test method is defined, call it here. For the copy command, |
- # test methods are expected to take one argument: an open file pointer, |
- # and are used to perturb the open file during download to exercise |
- # download error detection. |
- if test_method: |
- test_method(fp) |
- |
- except ResumableDownloadException as e: |
- logger.warning('Caught ResumableDownloadException (%s) for file %s.', |
- e.reason, file_name) |
- raise |
- |
- finally: |
- if fp: |
- fp.close() |
- for file_name in open_files: |
- open_files_map.delete(file_name) |
- |
- # If we decompressed a content-encoding gzip file on the fly, this may not |
- # be accurate, but it is the best we can do without going deep into the |
- # underlying HTTP libraries. Note that this value is only used for |
- # reporting in log messages; inaccuracy doesn't impact the integrity of the |
- # download. |
- bytes_transferred = src_obj_metadata.size - download_start_point |
- server_gzip = server_encoding and server_encoding.lower().endswith('gzip') |
- local_md5 = _ValidateDownloadHashes(logger, src_url, src_obj_metadata, |
- dst_url, need_to_unzip, server_gzip, |
- digesters, hash_algs, api_selector, |
- bytes_transferred) |
- |
- return (end_time - start_time, bytes_transferred, dst_url, local_md5) |
- |
- |
-def _GetDownloadZipFileName(file_name): |
- """Returns the file name for a temporarily compressed downloaded file.""" |
- return '%s_.gztmp' % file_name |
- |
- |
-def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, |
- need_to_unzip, server_gzip, digesters, hash_algs, |
- api_selector, bytes_transferred): |
- """Validates a downloaded file's integrity. |
- |
- Args: |
- logger: For outputting log messages. |
- src_url: StorageUrl for the source object. |
- src_obj_metadata: Metadata for the source object, potentially containing |
- hash values. |
- dst_url: StorageUrl describing the destination file. |
- need_to_unzip: If true, a temporary zip file was used and must be |
- uncompressed as part of validation. |
- server_gzip: If true, the server gzipped the bytes (regardless of whether |
- the object metadata claimed it was gzipped). |
- digesters: dict of {string, hash digester} that contains up-to-date digests |
- computed during the download. If a digester for a particular |
- algorithm is None, an up-to-date digest is not available and the |
- hash must be recomputed from the local file. |
- hash_algs: dict of {string, hash algorithm} that can be used if digesters |
- don't have up-to-date digests. |
- api_selector: The Cloud API implementation used (used tracker file naming). |
- bytes_transferred: Number of bytes downloaded (used for logging). |
- |
- Returns: |
- An MD5 of the local file, if one was calculated as part of the integrity |
- check. |
- """ |
- file_name = dst_url.object_name |
- download_file_name = (_GetDownloadZipFileName(file_name) if need_to_unzip else |
- file_name) |
- digesters_succeeded = True |
- for alg in digesters: |
- # If we get a digester with a None algorithm, the underlying |
- # implementation failed to calculate a digest, so we will need to |
- # calculate one from scratch. |
- if not digesters[alg]: |
- digesters_succeeded = False |
- break |
- |
- if digesters_succeeded: |
- local_hashes = _CreateDigestsFromDigesters(digesters) |
- else: |
- local_hashes = _CreateDigestsFromLocalFile( |
- logger, hash_algs, download_file_name, src_obj_metadata) |
- |
- digest_verified = True |
- hash_invalid_exception = None |
- try: |
- _CheckHashes(logger, src_url, src_obj_metadata, download_file_name, |
- local_hashes) |
- DeleteTrackerFile(GetTrackerFilePath( |
- dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
- except HashMismatchException, e: |
- # If an non-gzipped object gets sent with gzip content encoding, the hash |
- # we calculate will match the gzipped bytes, not the original object. Thus, |
- # we'll need to calculate and check it after unzipping. |
- if server_gzip: |
- logger.debug( |
- 'Hash did not match but server gzipped the content, will ' |
- 'recalculate.') |
- digest_verified = False |
- elif api_selector == ApiSelector.XML: |
- logger.debug( |
- 'Hash did not match but server may have gzipped the content, will ' |
- 'recalculate.') |
- # Save off the exception in case this isn't a gzipped file. |
- hash_invalid_exception = e |
- digest_verified = False |
- else: |
- DeleteTrackerFile(GetTrackerFilePath( |
- dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
- if _RENAME_ON_HASH_MISMATCH: |
- os.rename(download_file_name, |
- download_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) |
- else: |
- os.unlink(download_file_name) |
- raise |
- |
- if server_gzip and not need_to_unzip: |
- # Server compressed bytes on-the-fly, thus we need to rename and decompress. |
- # We can't decompress on-the-fly because prior to Python 3.2 the gzip |
- # module makes a bunch of seek calls on the stream. |
- download_file_name = _GetDownloadZipFileName(file_name) |
- os.rename(file_name, download_file_name) |
- |
- if need_to_unzip or server_gzip: |
- # Log that we're uncompressing if the file is big enough that |
- # decompressing would make it look like the transfer "stalled" at the end. |
- if bytes_transferred > TEN_MIB: |
- logger.info( |
- 'Uncompressing downloaded tmp file to %s...', file_name) |
- |
- # Downloaded gzipped file to a filename w/o .gz extension, so unzip. |
- gzip_fp = None |
- try: |
- gzip_fp = gzip.open(download_file_name, 'rb') |
- with open(file_name, 'wb') as f_out: |
- data = gzip_fp.read(GZIP_CHUNK_SIZE) |
- while data: |
- f_out.write(data) |
- data = gzip_fp.read(GZIP_CHUNK_SIZE) |
- except IOError, e: |
- # In the XML case where we don't know if the file was gzipped, raise |
- # the original hash exception if we find that it wasn't. |
- if 'Not a gzipped file' in str(e) and hash_invalid_exception: |
- # Linter improperly thinks we're raising None despite the above check. |
- # pylint: disable=raising-bad-type |
- raise hash_invalid_exception |
- finally: |
- if gzip_fp: |
- gzip_fp.close() |
- |
- os.unlink(download_file_name) |
- |
- if not digest_verified: |
- try: |
- # Recalculate hashes on the unzipped local file. |
- local_hashes = _CreateDigestsFromLocalFile(logger, hash_algs, file_name, |
- src_obj_metadata) |
- _CheckHashes(logger, src_url, src_obj_metadata, file_name, local_hashes) |
- DeleteTrackerFile(GetTrackerFilePath( |
- dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
- except HashMismatchException: |
- DeleteTrackerFile(GetTrackerFilePath( |
- dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
- if _RENAME_ON_HASH_MISMATCH: |
- os.rename(file_name, |
- file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) |
- else: |
- os.unlink(file_name) |
- raise |
- |
- if 'md5' in local_hashes: |
- return local_hashes['md5'] |
- |
- |
-def _CopyFileToFile(src_url, dst_url): |
- """Copies a local file to a local file. |
- |
- Args: |
- src_url: Source FileUrl. |
- dst_url: Destination FileUrl. |
- Returns: |
- (elapsed_time, bytes_transferred, dst_url, md5=None). |
- |
- Raises: |
- CommandException: if errors encountered. |
- """ |
- src_fp = GetStreamFromFileUrl(src_url) |
- dir_name = os.path.dirname(dst_url.object_name) |
- if dir_name and not os.path.exists(dir_name): |
- os.makedirs(dir_name) |
- dst_fp = open(dst_url.object_name, 'wb') |
- start_time = time.time() |
- shutil.copyfileobj(src_fp, dst_fp) |
- end_time = time.time() |
- return (end_time - start_time, os.path.getsize(dst_url.object_name), |
- dst_url, None) |
- |
- |
-def _DummyTrackerCallback(_): |
- pass |
- |
- |
-# pylint: disable=undefined-variable |
-def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url, |
- dst_obj_metadata, preconditions, gsutil_api, |
- logger): |
- """Copies from src_url to dst_url in "daisy chain" mode. |
- |
- See -D OPTION documentation about what daisy chain mode is. |
- |
- Args: |
- src_url: Source CloudUrl |
- src_obj_metadata: Metadata from source object |
- dst_url: Destination CloudUrl |
- dst_obj_metadata: Object-specific metadata that should be overidden during |
- the copy. |
- preconditions: Preconditions to use for the copy. |
- gsutil_api: gsutil Cloud API to use for the copy. |
- logger: For outputting log messages. |
- |
- Returns: |
- (elapsed_time, bytes_transferred, dst_url with generation, |
- md5 hash of destination) excluding overhead like initial GET. |
- |
- Raises: |
- CommandException: if errors encountered. |
- """ |
- # We don't attempt to preserve ACLs across providers because |
- # GCS and S3 support different ACLs and disjoint principals. |
- if (global_copy_helper_opts.preserve_acl |
- and src_url.scheme != dst_url.scheme): |
- raise NotImplementedError( |
- 'Cross-provider cp -p not supported') |
- if not global_copy_helper_opts.preserve_acl: |
- dst_obj_metadata.acl = [] |
- |
- # Don't use callbacks for downloads on the daisy chain wrapper because |
- # upload callbacks will output progress, but respect test hooks if present. |
- progress_callback = None |
- if global_copy_helper_opts.test_callback_file: |
- with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: |
- progress_callback = pickle.loads(test_fp.read()).call |
- |
- start_time = time.time() |
- upload_fp = DaisyChainWrapper(src_url, src_obj_metadata.size, gsutil_api, |
- progress_callback=progress_callback) |
- uploaded_object = None |
- if src_obj_metadata.size == 0: |
- # Resumable uploads of size 0 are not supported. |
- uploaded_object = gsutil_api.UploadObject( |
- upload_fp, object_metadata=dst_obj_metadata, |
- canned_acl=global_copy_helper_opts.canned_acl, |
- preconditions=preconditions, provider=dst_url.scheme, |
- fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size) |
- else: |
- # TODO: Support process-break resumes. This will resume across connection |
- # breaks and server errors, but the tracker callback is a no-op so this |
- # won't resume across gsutil runs. |
- # TODO: Test retries via test_callback_file. |
- uploaded_object = gsutil_api.UploadObjectResumable( |
- upload_fp, object_metadata=dst_obj_metadata, |
- canned_acl=global_copy_helper_opts.canned_acl, |
- preconditions=preconditions, provider=dst_url.scheme, |
- fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size, |
- progress_callback=FileProgressCallbackHandler( |
- ConstructAnnounceText('Uploading', dst_url.url_string), |
- logger).call, |
- tracker_callback=_DummyTrackerCallback) |
- end_time = time.time() |
- |
- try: |
- _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, |
- uploaded_object) |
- except HashMismatchException: |
- if _RENAME_ON_HASH_MISMATCH: |
- corrupted_obj_metadata = apitools_messages.Object( |
- name=dst_obj_metadata.name, |
- bucket=dst_obj_metadata.bucket, |
- etag=uploaded_object.etag) |
- dst_obj_metadata.name = (dst_url.object_name + |
- _RENAME_ON_HASH_MISMATCH_SUFFIX) |
- gsutil_api.CopyObject(corrupted_obj_metadata, |
- dst_obj_metadata, provider=dst_url.scheme) |
- # If the digest doesn't match, delete the object. |
- gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name, |
- generation=uploaded_object.generation, |
- provider=dst_url.scheme) |
- raise |
- |
- result_url = dst_url.Clone() |
- result_url.generation = GenerationFromUrlAndString( |
- result_url, uploaded_object.generation) |
- |
- return (end_time - start_time, src_obj_metadata.size, result_url, |
- uploaded_object.md5Hash) |
- |
- |
-# pylint: disable=undefined-variable |
-# pylint: disable=too-many-statements |
-def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, |
- copy_exception_handler, allow_splitting=True, |
- headers=None, manifest=None, gzip_exts=None, test_method=None): |
- """Performs copy from src_url to dst_url, handling various special cases. |
- |
- Args: |
- logger: for outputting log messages. |
- src_url: Source StorageUrl. |
- dst_url: Destination StorageUrl. |
- gsutil_api: gsutil Cloud API instance to use for the copy. |
- command_obj: command object for use in Apply in parallel composite uploads. |
- copy_exception_handler: for handling copy exceptions during Apply. |
- allow_splitting: Whether to allow the file to be split into component |
- pieces for an parallel composite upload. |
- headers: optional headers to use for the copy operation. |
- manifest: optional manifest for tracking copy operations. |
- gzip_exts: List of file extensions to gzip for uploads, if any. |
- test_method: optional test method for modifying files during unit tests. |
- |
- Returns: |
- (elapsed_time, bytes_transferred, version-specific dst_url) excluding |
- overhead like initial GET. |
- |
- Raises: |
- ItemExistsError: if no clobber flag is specified and the destination |
- object already exists. |
- SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified |
- and the source is an unsupported type. |
- CommandException: if other errors encountered. |
- """ |
- if headers: |
- dst_obj_headers = headers.copy() |
- else: |
- dst_obj_headers = {} |
- |
- # Create a metadata instance for each destination object so metadata |
- # such as content-type can be applied per-object. |
- # Initialize metadata from any headers passed in via -h. |
- dst_obj_metadata = ObjectMetadataFromHeaders(dst_obj_headers) |
- |
- if dst_url.IsCloudUrl() and dst_url.scheme == 'gs': |
- preconditions = PreconditionsFromHeaders(dst_obj_headers) |
- else: |
- preconditions = Preconditions() |
- |
- src_obj_metadata = None |
- src_obj_filestream = None |
- if src_url.IsCloudUrl(): |
- src_obj_fields = None |
- if dst_url.IsCloudUrl(): |
- # For cloud or daisy chain copy, we need every copyable field. |
- # If we're not modifying or overriding any of the fields, we can get |
- # away without retrieving the object metadata because the copy |
- # operation can succeed with just the destination bucket and object |
- # name. But if we are sending any metadata, the JSON API will expect a |
- # complete object resource. Since we want metadata like the object size |
- # for our own tracking, we just get all of the metadata here. |
- src_obj_fields = ['cacheControl', 'componentCount', |
- 'contentDisposition', 'contentEncoding', |
- 'contentLanguage', 'contentType', 'crc32c', |
- 'etag', 'generation', 'md5Hash', 'mediaLink', |
- 'metadata', 'metageneration', 'size'] |
- # We only need the ACL if we're going to preserve it. |
- if global_copy_helper_opts.preserve_acl: |
- src_obj_fields.append('acl') |
- if (src_url.scheme == dst_url.scheme |
- and not global_copy_helper_opts.daisy_chain): |
- copy_in_the_cloud = True |
- else: |
- copy_in_the_cloud = False |
- else: |
- # Just get the fields needed to validate the download. |
- src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag', |
- 'mediaLink', 'md5Hash', 'size'] |
- |
- if (src_url.scheme == 's3' and |
- global_copy_helper_opts.skip_unsupported_objects): |
- src_obj_fields.append('storageClass') |
- |
- try: |
- src_generation = GenerationFromUrlAndString(src_url, src_url.generation) |
- src_obj_metadata = gsutil_api.GetObjectMetadata( |
- src_url.bucket_name, src_url.object_name, |
- generation=src_generation, provider=src_url.scheme, |
- fields=src_obj_fields) |
- except NotFoundException: |
- raise CommandException( |
- 'NotFoundException: Could not retrieve source object %s.' % |
- src_url.url_string) |
- if (src_url.scheme == 's3' and |
- global_copy_helper_opts.skip_unsupported_objects and |
- src_obj_metadata.storageClass == 'GLACIER'): |
- raise SkipGlacierError() |
- |
- src_obj_size = src_obj_metadata.size |
- dst_obj_metadata.contentType = src_obj_metadata.contentType |
- if global_copy_helper_opts.preserve_acl: |
- dst_obj_metadata.acl = src_obj_metadata.acl |
- # Special case for S3-to-S3 copy URLs using |
- # global_copy_helper_opts.preserve_acl. |
- # dst_url will be verified in _CopyObjToObjDaisyChainMode if it |
- # is not s3 (and thus differs from src_url). |
- if src_url.scheme == 's3': |
- acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata) |
- if acl_text: |
- AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text) |
- else: |
- try: |
- src_obj_filestream = GetStreamFromFileUrl(src_url) |
- except Exception, e: # pylint: disable=broad-except |
- raise CommandException('Error opening file "%s": %s.' % (src_url, |
- e.message)) |
- if src_url.IsStream(): |
- src_obj_size = None |
- else: |
- src_obj_size = os.path.getsize(src_url.object_name) |
- |
- if global_copy_helper_opts.use_manifest: |
- # Set the source size in the manifest. |
- manifest.Set(src_url.url_string, 'size', src_obj_size) |
- |
- if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE |
- and src_url != 's3'): |
- raise CommandException( |
- '"%s" exceeds the maximum gsutil-supported size for an S3 upload. S3 ' |
- 'objects greater than %s in size require multipart uploads, which ' |
- 'gsutil does not support.' % (src_url, |
- MakeHumanReadable(S3_MAX_UPLOAD_SIZE))) |
- |
- # On Windows, stdin is opened as text mode instead of binary which causes |
- # problems when piping a binary file, so this switches it to binary mode. |
- if IS_WINDOWS and src_url.IsFileUrl() and src_url.IsStream(): |
- msvcrt.setmode(GetStreamFromFileUrl(src_url).fileno(), os.O_BINARY) |
- |
- if global_copy_helper_opts.no_clobber: |
- # There are two checks to prevent clobbering: |
- # 1) The first check is to see if the URL |
- # already exists at the destination and prevent the upload/download |
- # from happening. This is done by the exists() call. |
- # 2) The second check is only relevant if we are writing to gs. We can |
- # enforce that the server only writes the object if it doesn't exist |
- # by specifying the header below. This check only happens at the |
- # server after the complete file has been uploaded. We specify this |
- # header to prevent a race condition where a destination file may |
- # be created after the first check and before the file is fully |
- # uploaded. |
- # In order to save on unnecessary uploads/downloads we perform both |
- # checks. However, this may come at the cost of additional HTTP calls. |
- if preconditions.gen_match: |
- raise ArgumentException('Specifying x-goog-if-generation-match is ' |
- 'not supported with cp -n') |
- else: |
- preconditions.gen_match = 0 |
- if dst_url.IsFileUrl() and os.path.exists(dst_url.object_name): |
- # The local file may be a partial. Check the file sizes. |
- if src_obj_size == os.path.getsize(dst_url.object_name): |
- raise ItemExistsError() |
- elif dst_url.IsCloudUrl(): |
- try: |
- dst_object = gsutil_api.GetObjectMetadata( |
- dst_url.bucket_name, dst_url.object_name, provider=dst_url.scheme) |
- except NotFoundException: |
- dst_object = None |
- if dst_object: |
- raise ItemExistsError() |
- |
- if dst_url.IsCloudUrl(): |
- # Cloud storage API gets object and bucket name from metadata. |
- dst_obj_metadata.name = dst_url.object_name |
- dst_obj_metadata.bucket = dst_url.bucket_name |
- if src_url.IsCloudUrl(): |
- # Preserve relevant metadata from the source object if it's not already |
- # provided from the headers. |
- CopyObjectMetadata(src_obj_metadata, dst_obj_metadata, override=False) |
- src_obj_metadata.name = src_url.object_name |
- src_obj_metadata.bucket = src_url.bucket_name |
- else: |
- _SetContentTypeFromFile(src_url, dst_obj_metadata) |
- else: |
- # Files don't have Cloud API metadata. |
- dst_obj_metadata = None |
- |
- _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata) |
- |
- if src_url.IsCloudUrl(): |
- if dst_url.IsFileUrl(): |
- return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
- gsutil_api, logger, test_method=test_method) |
- elif copy_in_the_cloud: |
- return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, |
- dst_obj_metadata, preconditions, |
- gsutil_api, logger) |
- else: |
- return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, |
- dst_url, dst_obj_metadata, |
- preconditions, gsutil_api, logger) |
- else: # src_url.IsFileUrl() |
- if dst_url.IsCloudUrl(): |
- return _UploadFileToObject( |
- src_url, src_obj_filestream, src_obj_size, dst_url, |
- dst_obj_metadata, preconditions, gsutil_api, logger, command_obj, |
- copy_exception_handler, gzip_exts=gzip_exts, |
- allow_splitting=allow_splitting) |
- else: # dst_url.IsFileUrl() |
- return _CopyFileToFile(src_url, dst_url) |
- |
- |
-class Manifest(object): |
- """Stores the manifest items for the CpCommand class.""" |
- |
- def __init__(self, path): |
- # self.items contains a dictionary of rows |
- self.items = {} |
- self.manifest_filter = {} |
- self.lock = CreateLock() |
- |
- self.manifest_path = os.path.expanduser(path) |
- self._ParseManifest() |
- self._CreateManifestFile() |
- |
- def _ParseManifest(self): |
- """Load and parse a manifest file. |
- |
- This information will be used to skip any files that have a skip or OK |
- status. |
- """ |
- try: |
- if os.path.exists(self.manifest_path): |
- with open(self.manifest_path, 'rb') as f: |
- first_row = True |
- reader = csv.reader(f) |
- for row in reader: |
- if first_row: |
- try: |
- source_index = row.index('Source') |
- result_index = row.index('Result') |
- except ValueError: |
- # No header and thus not a valid manifest file. |
- raise CommandException( |
- 'Missing headers in manifest file: %s' % self.manifest_path) |
- first_row = False |
- source = row[source_index] |
- result = row[result_index] |
- if result in ['OK', 'skip']: |
- # We're always guaranteed to take the last result of a specific |
- # source url. |
- self.manifest_filter[source] = result |
- except IOError: |
- raise CommandException('Could not parse %s' % self.manifest_path) |
- |
- def WasSuccessful(self, src): |
- """Returns whether the specified src url was marked as successful.""" |
- return src in self.manifest_filter |
- |
- def _CreateManifestFile(self): |
- """Opens the manifest file and assigns it to the file pointer.""" |
- try: |
- if ((not os.path.exists(self.manifest_path)) |
- or (os.stat(self.manifest_path).st_size == 0)): |
- # Add headers to the new file. |
- with open(self.manifest_path, 'wb', 1) as f: |
- writer = csv.writer(f) |
- writer.writerow(['Source', |
- 'Destination', |
- 'Start', |
- 'End', |
- 'Md5', |
- 'UploadId', |
- 'Source Size', |
- 'Bytes Transferred', |
- 'Result', |
- 'Description']) |
- except IOError: |
- raise CommandException('Could not create manifest file.') |
- |
- def Set(self, url, key, value): |
- if value is None: |
- # In case we don't have any information to set we bail out here. |
- # This is so that we don't clobber existing information. |
- # To zero information pass '' instead of None. |
- return |
- if url in self.items: |
- self.items[url][key] = value |
- else: |
- self.items[url] = {key: value} |
- |
- def Initialize(self, source_url, destination_url): |
- # Always use the source_url as the key for the item. This is unique. |
- self.Set(source_url, 'source_uri', source_url) |
- self.Set(source_url, 'destination_uri', destination_url) |
- self.Set(source_url, 'start_time', datetime.datetime.utcnow()) |
- |
- def SetResult(self, source_url, bytes_transferred, result, |
- description=''): |
- self.Set(source_url, 'bytes', bytes_transferred) |
- self.Set(source_url, 'result', result) |
- self.Set(source_url, 'description', description) |
- self.Set(source_url, 'end_time', datetime.datetime.utcnow()) |
- self._WriteRowToManifestFile(source_url) |
- self._RemoveItemFromManifest(source_url) |
- |
- def _WriteRowToManifestFile(self, url): |
- """Writes a manifest entry to the manifest file for the url argument.""" |
- row_item = self.items[url] |
- data = [ |
- str(row_item['source_uri'].encode(UTF8)), |
- str(row_item['destination_uri'].encode(UTF8)), |
- '%sZ' % row_item['start_time'].isoformat(), |
- '%sZ' % row_item['end_time'].isoformat(), |
- row_item['md5'] if 'md5' in row_item else '', |
- row_item['upload_id'] if 'upload_id' in row_item else '', |
- str(row_item['size']) if 'size' in row_item else '', |
- str(row_item['bytes']) if 'bytes' in row_item else '', |
- row_item['result'], |
- row_item['description'].encode(UTF8)] |
- |
- # Aquire a lock to prevent multiple threads writing to the same file at |
- # the same time. This would cause a garbled mess in the manifest file. |
- with self.lock: |
- with open(self.manifest_path, 'a', 1) as f: # 1 == line buffered |
- writer = csv.writer(f) |
- writer.writerow(data) |
- |
- def _RemoveItemFromManifest(self, url): |
- # Remove the item from the dictionary since we're done with it and |
- # we don't want the dictionary to grow too large in memory for no good |
- # reason. |
- del self.items[url] |
- |
- |
-class ItemExistsError(Exception): |
- """Exception class for objects that are skipped because they already exist.""" |
- pass |
- |
- |
-class SkipUnsupportedObjectError(Exception): |
- """Exception for objects skipped because they are an unsupported type.""" |
- |
- def __init__(self): |
- super(SkipUnsupportedObjectError, self).__init__() |
- self.unsupported_type = 'Unknown' |
- |
- |
-class SkipGlacierError(SkipUnsupportedObjectError): |
- """Exception for objects skipped because they are an unsupported type.""" |
- |
- def __init__(self): |
- super(SkipGlacierError, self).__init__() |
- self.unsupported_type = 'GLACIER' |
- |
- |
-def GetPathBeforeFinalDir(url): |
- """Returns the path section before the final directory component of the URL. |
- |
- This handles cases for file system directories, bucket, and bucket |
- subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket', |
- and for file://dir we'll return file:// |
- |
- Args: |
- url: StorageUrl representing a filesystem directory, cloud bucket or |
- bucket subdir. |
- |
- Returns: |
- String name of above-described path, sans final path separator. |
- """ |
- sep = url.delim |
- if url.IsFileUrl(): |
- past_scheme = url.url_string[len('file://'):] |
- if past_scheme.find(sep) == -1: |
- return 'file://' |
- else: |
- return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] |
- if url.IsBucket(): |
- return '%s://' % url.scheme |
- # Else it names a bucket subdir. |
- return url.url_string.rstrip(sep).rpartition(sep)[0] |
- |
- |
-def _DivideAndCeil(dividend, divisor): |
- """Returns ceil(dividend / divisor). |
- |
- Takes care to avoid the pitfalls of floating point arithmetic that could |
- otherwise yield the wrong result for large numbers. |
- |
- Args: |
- dividend: Dividend for the operation. |
- divisor: Divisor for the operation. |
- |
- Returns: |
- Quotient. |
- """ |
- quotient = dividend // divisor |
- if (dividend % divisor) != 0: |
- quotient += 1 |
- return quotient |
- |
- |
-def _GetPartitionInfo(file_size, max_components, default_component_size): |
- """Gets info about a file partition for parallel composite uploads. |
- |
- Args: |
- file_size: The number of bytes in the file to be partitioned. |
- max_components: The maximum number of components that can be composed. |
- default_component_size: The size of a component, assuming that |
- max_components is infinite. |
- Returns: |
- The number of components in the partitioned file, and the size of each |
- component (except the last, which will have a different size iff |
- file_size != 0 (mod num_components)). |
- """ |
- # num_components = ceil(file_size / default_component_size) |
- num_components = _DivideAndCeil(file_size, default_component_size) |
- |
- # num_components must be in the range [2, max_components] |
- num_components = max(min(num_components, max_components), 2) |
- |
- # component_size = ceil(file_size / num_components) |
- component_size = _DivideAndCeil(file_size, num_components) |
- return (num_components, component_size) |
- |
- |
-def _DeleteObjectFn(cls, url_to_delete, thread_state=None): |
- """Wrapper function to be used with command.Apply().""" |
- gsutil_api = GetCloudApiInstance(cls, thread_state) |
- gsutil_api.DeleteObject( |
- url_to_delete.bucket_name, url_to_delete.object_name, |
- generation=url_to_delete.generation, provider=url_to_delete.scheme) |
- |
- |
-def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock): |
- """Parse the tracker file from the last parallel composite upload attempt. |
- |
- If it exists, the tracker file is of the format described in |
- _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be |
- read, then the upload will start from the beginning. |
- |
- Args: |
- tracker_file: The name of the file to parse. |
- tracker_file_lock: Lock protecting access to the tracker file. |
- |
- Returns: |
- random_prefix: A randomly-generated prefix to the name of the |
- temporary components. |
- existing_objects: A list of ObjectFromTracker objects representing |
- the set of files that have already been uploaded. |
- """ |
- |
- def GenerateRandomPrefix(): |
- return str(random.randint(1, (10 ** 10) - 1)) |
- |
- existing_objects = [] |
- try: |
- with tracker_file_lock: |
- with open(tracker_file, 'r') as fp: |
- lines = fp.readlines() |
- lines = [line.strip() for line in lines] |
- if not lines: |
- print('Parallel upload tracker file (%s) was invalid. ' |
- 'Restarting upload from scratch.' % tracker_file) |
- lines = [GenerateRandomPrefix()] |
- |
- except IOError as e: |
- # We can't read the tracker file, so generate a new random prefix. |
- lines = [GenerateRandomPrefix()] |
- |
- # Ignore non-existent file (happens first time an upload |
- # is attempted on a file), but warn user for other errors. |
- if e.errno != errno.ENOENT: |
- # Will restart because we failed to read in the file. |
- print('Couldn\'t read parallel upload tracker file (%s): %s. ' |
- 'Restarting upload from scratch.' % (tracker_file, e.strerror)) |
- |
- # The first line contains the randomly-generated prefix. |
- random_prefix = lines[0] |
- |
- # The remaining lines were written in pairs to describe a single component |
- # in the form: |
- # object_name (without random prefix) |
- # generation |
- # Newlines are used as the delimiter because only newlines and carriage |
- # returns are invalid characters in object names, and users can specify |
- # a custom prefix in the config file. |
- i = 1 |
- while i < len(lines): |
- (name, generation) = (lines[i], lines[i+1]) |
- if not generation: |
- # Cover the '' case. |
- generation = None |
- existing_objects.append(ObjectFromTracker(name, generation)) |
- i += 2 |
- return (random_prefix, existing_objects) |
- |
- |
-def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component, |
- tracker_file_lock): |
- """Appends info about the uploaded component to an existing tracker file. |
- |
- Follows the format described in _CreateParallelUploadTrackerFile. |
- |
- Args: |
- tracker_file: Tracker file to append to. |
- component: Component that was uploaded. |
- tracker_file_lock: Thread and process-safe Lock for the tracker file. |
- """ |
- lines = _GetParallelUploadTrackerFileLinesForComponents([component]) |
- lines = [line + '\n' for line in lines] |
- with tracker_file_lock: |
- with open(tracker_file, 'a') as f: |
- f.writelines(lines) |
- |
- |
-def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components, |
- tracker_file_lock): |
- """Writes information about components that were successfully uploaded. |
- |
- This way the upload can be resumed at a later date. The tracker file has |
- the format: |
- random_prefix |
- temp_object_1_name |
- temp_object_1_generation |
- . |
- . |
- . |
- temp_object_N_name |
- temp_object_N_generation |
- where N is the number of components that have been successfully uploaded. |
- |
- Args: |
- tracker_file: The name of the parallel upload tracker file. |
- random_prefix: The randomly-generated prefix that was used for |
- for uploading any existing components. |
- components: A list of ObjectFromTracker objects that were uploaded. |
- tracker_file_lock: The lock protecting access to the tracker file. |
- """ |
- lines = [random_prefix] |
- lines += _GetParallelUploadTrackerFileLinesForComponents(components) |
- lines = [line + '\n' for line in lines] |
- try: |
- with tracker_file_lock: |
- open(tracker_file, 'w').close() # Clear the file. |
- with open(tracker_file, 'w') as f: |
- f.writelines(lines) |
- except IOError as e: |
- RaiseUnwritableTrackerFileException(tracker_file, e.strerror) |
- |
- |
-def _GetParallelUploadTrackerFileLinesForComponents(components): |
- """Return a list of the lines for use in a parallel upload tracker file. |
- |
- The lines represent the given components, using the format as described in |
- _CreateParallelUploadTrackerFile. |
- |
- Args: |
- components: A list of ObjectFromTracker objects that were uploaded. |
- |
- Returns: |
- Lines describing components with their generation for outputting to the |
- tracker file. |
- """ |
- lines = [] |
- for component in components: |
- generation = None |
- generation = component.generation |
- if not generation: |
- generation = '' |
- lines += [component.object_name, str(generation)] |
- return lines |
- |
- |
-def FilterExistingComponents(dst_args, existing_components, bucket_url, |
- gsutil_api): |
- """Determines course of action for component objects. |
- |
- Given the list of all target objects based on partitioning the file and |
- the list of objects that have already been uploaded successfully, |
- this function determines which objects should be uploaded, which |
- existing components are still valid, and which existing components should |
- be deleted. |
- |
- Args: |
- dst_args: The map of file_name -> PerformParallelUploadFileToObjectArgs |
- calculated by partitioning the file. |
- existing_components: A list of ObjectFromTracker objects that have been |
- uploaded in the past. |
- bucket_url: CloudUrl of the bucket in which the components exist. |
- gsutil_api: gsutil Cloud API instance to use for retrieving object metadata. |
- |
- Returns: |
- components_to_upload: List of components that need to be uploaded. |
- uploaded_components: List of components that have already been |
- uploaded and are still valid. |
- existing_objects_to_delete: List of components that have already |
- been uploaded, but are no longer valid |
- and are in a versioned bucket, and |
- therefore should be deleted. |
- """ |
- components_to_upload = [] |
- existing_component_names = [component.object_name |
- for component in existing_components] |
- for component_name in dst_args: |
- if component_name not in existing_component_names: |
- components_to_upload.append(dst_args[component_name]) |
- |
- objects_already_chosen = [] |
- |
- # Don't reuse any temporary components whose MD5 doesn't match the current |
- # MD5 of the corresponding part of the file. If the bucket is versioned, |
- # also make sure that we delete the existing temporary version. |
- existing_objects_to_delete = [] |
- uploaded_components = [] |
- for tracker_object in existing_components: |
- if (tracker_object.object_name not in dst_args.keys() |
- or tracker_object.object_name in objects_already_chosen): |
- # This could happen if the component size has changed. This also serves |
- # to handle object names that get duplicated in the tracker file due |
- # to people doing things they shouldn't (e.g., overwriting an existing |
- # temporary component in a versioned bucket). |
- |
- url = bucket_url.Clone() |
- url.object_name = tracker_object.object_name |
- url.generation = tracker_object.generation |
- existing_objects_to_delete.append(url) |
- continue |
- |
- dst_arg = dst_args[tracker_object.object_name] |
- file_part = FilePart(dst_arg.filename, dst_arg.file_start, |
- dst_arg.file_length) |
- # TODO: calculate MD5's in parallel when possible. |
- content_md5 = CalculateB64EncodedMd5FromContents(file_part) |
- |
- try: |
- # Get the MD5 of the currently-existing component. |
- dst_url = dst_arg.dst_url |
- dst_metadata = gsutil_api.GetObjectMetadata( |
- dst_url.bucket_name, dst_url.object_name, |
- generation=dst_url.generation, provider=dst_url.scheme, |
- fields=['md5Hash', 'etag']) |
- cloud_md5 = dst_metadata.md5Hash |
- except Exception: # pylint: disable=broad-except |
- # We don't actually care what went wrong - we couldn't retrieve the |
- # object to check the MD5, so just upload it again. |
- cloud_md5 = None |
- |
- if cloud_md5 != content_md5: |
- components_to_upload.append(dst_arg) |
- objects_already_chosen.append(tracker_object.object_name) |
- if tracker_object.generation: |
- # If the old object doesn't have a generation (i.e., it isn't in a |
- # versioned bucket), then we will just overwrite it anyway. |
- invalid_component_with_generation = dst_arg.dst_url.Clone() |
- invalid_component_with_generation.generation = tracker_object.generation |
- existing_objects_to_delete.append(invalid_component_with_generation) |
- else: |
- url = dst_arg.dst_url.Clone() |
- url.generation = tracker_object.generation |
- uploaded_components.append(url) |
- objects_already_chosen.append(tracker_object.object_name) |
- |
- if uploaded_components: |
- logging.info('Found %d existing temporary components to reuse.', |
- len(uploaded_components)) |
- |
- return (components_to_upload, uploaded_components, |
- existing_objects_to_delete) |