| Index: gslib/commands/cp.py
|
| ===================================================================
|
| --- gslib/commands/cp.py (revision 33376)
|
| +++ gslib/commands/cp.py (working copy)
|
| @@ -1,3 +1,4 @@
|
| +# -*- coding: utf-8 -*-
|
| # Copyright 2011 Google Inc. All Rights Reserved.
|
| # Copyright 2011, Nexenta Systems Inc.
|
| #
|
| @@ -12,155 +13,40 @@
|
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| # See the License for the specific language governing permissions and
|
| # limitations under the License.
|
| +"""Implementation of Unix-like cp command for cloud storage providers."""
|
|
|
| -# Get the system logging module, not our local logging module.
|
| from __future__ import absolute_import
|
|
|
| -import binascii
|
| -import boto
|
| -import copy
|
| -import crcmod
|
| -import csv
|
| -import datetime
|
| -import errno
|
| -import gslib
|
| -import gzip
|
| -import hashlib
|
| -import logging
|
| -import mimetypes
|
| -import mmap
|
| -import multiprocessing
|
| import os
|
| -import platform
|
| -import random
|
| -import re
|
| -import stat
|
| -import subprocess
|
| -import sys
|
| -import tempfile
|
| -import textwrap
|
| -import threading
|
| import time
|
| import traceback
|
|
|
| -from gslib.util import AddAcceptEncoding
|
| -
|
| -try:
|
| - from hashlib import md5
|
| -except ImportError:
|
| - from md5 import md5
|
| -
|
| -from boto import config
|
| -from boto.exception import GSResponseError
|
| -from boto.exception import ResumableUploadException
|
| -from boto.gs.resumable_upload_handler import ResumableUploadHandler
|
| -from boto.s3.keyfile import KeyFile
|
| -from boto.s3.resumable_download_handler import ResumableDownloadHandler
|
| -from boto.storage_uri import BucketStorageUri
|
| -from boto.storage_uri import StorageUri
|
| -from collections import namedtuple
|
| -from gslib.bucket_listing_ref import BucketListingRef
|
| -from gslib.command import COMMAND_NAME
|
| -from gslib.command import COMMAND_NAME_ALIASES
|
| +from gslib import copy_helper
|
| +from gslib.cat_helper import CatHelper
|
| +from gslib.cloud_api import AccessDeniedException
|
| +from gslib.cloud_api import NotFoundException
|
| from gslib.command import Command
|
| -from gslib.command import FILE_URIS_OK
|
| -from gslib.command import MAX_ARGS
|
| -from gslib.command import MIN_ARGS
|
| -from gslib.command import PROVIDER_URIS_OK
|
| -from gslib.command import SUPPORTED_SUB_ARGS
|
| -from gslib.command import URIS_START_ARG
|
| from gslib.commands.compose import MAX_COMPONENT_COUNT
|
| -from gslib.commands.compose import MAX_COMPOSE_ARITY
|
| -from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE
|
| -from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD
|
| +from gslib.copy_helper import CreateCopyHelperOpts
|
| +from gslib.copy_helper import ItemExistsError
|
| +from gslib.copy_helper import Manifest
|
| +from gslib.copy_helper import PARALLEL_UPLOAD_TEMP_NAMESPACE
|
| +from gslib.cs_api_map import ApiSelector
|
| from gslib.exception import CommandException
|
| -from gslib.file_part import FilePart
|
| -from gslib.help_provider import HELP_NAME
|
| -from gslib.help_provider import HELP_NAME_ALIASES
|
| -from gslib.help_provider import HELP_ONE_LINE_SUMMARY
|
| -from gslib.help_provider import HELP_TEXT
|
| -from gslib.help_provider import HelpType
|
| -from gslib.help_provider import HELP_TYPE
|
| from gslib.name_expansion import NameExpansionIterator
|
| -from gslib.util import BOTO_IS_SECURE
|
| +from gslib.storage_url import ContainsWildcard
|
| from gslib.util import CreateLock
|
| -from gslib.util import CreateTrackerDirIfNeeded
|
| -from gslib.util import GetConfigFilePath
|
| -from gslib.util import ParseErrorDetail
|
| -from gslib.util import HumanReadableToBytes
|
| -from gslib.util import IS_WINDOWS
|
| +from gslib.util import GetCloudApiInstance
|
| +from gslib.util import IsCloudSubdirPlaceholder
|
| from gslib.util import MakeHumanReadable
|
| from gslib.util import NO_MAX
|
| -from gslib.util import TWO_MB
|
| -from gslib.util import UsingCrcmodExtension
|
| -from gslib.wildcard_iterator import ContainsWildcard
|
| -from gslib.name_expansion import NameExpansionResult
|
| +from gslib.util import RemoveCRLFFromString
|
|
|
| -
|
| -SLOW_CRC_WARNING = """
|
| -WARNING: Downloading this composite object requires integrity checking with
|
| -CRC32c, but your crcmod installation isn't using the module's C extension, so
|
| -the the hash computation will likely throttle download performance. For help
|
| -installing the extension, please see:
|
| - $ gsutil help crcmod
|
| -To disable slow integrity checking, see the "check_hashes" option in your boto
|
| -config file.
|
| -"""
|
| -
|
| -SLOW_CRC_EXCEPTION = CommandException(
|
| -"""
|
| -Downloading this composite object requires integrity checking with CRC32c, but
|
| -your crcmod installation isn't using the module's C extension, so the the hash
|
| -computation will likely throttle download performance. For help installing the
|
| -extension, please see:
|
| - $ gsutil help crcmod
|
| -To download regardless of crcmod performance or to skip slow integrity checks,
|
| -see the "check_hashes" option in your boto config file.""")
|
| -
|
| -NO_HASH_CHECK_WARNING = """
|
| -WARNING: This download will not be validated since your crcmod installation
|
| -doesn't use the module's C extension, so the hash computation would likely
|
| -throttle download performance. For help in installing the extension, please see:
|
| - $ gsutil help crcmod
|
| -To force integrity checking, see the "check_hashes" option in your boto config
|
| -file.
|
| -"""
|
| -
|
| -NO_SERVER_HASH_EXCEPTION = CommandException(
|
| -"""
|
| -This object has no server-supplied hash for performing integrity
|
| -checks. To skip integrity checking for such objects, see the "check_hashes"
|
| -option in your boto config file.""")
|
| -
|
| -NO_SERVER_HASH_WARNING = """
|
| -WARNING: This object has no server-supplied hash for performing integrity
|
| -checks. To force integrity checking, see the "check_hashes" option in your boto
|
| -config file.
|
| -"""
|
| -
|
| -PARALLEL_UPLOAD_TEMP_NAMESPACE = (
|
| - u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/')
|
| -
|
| -PARALLEL_UPLOAD_STATIC_SALT = u"""
|
| -PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS.
|
| -The theory is that no user will have prepended this to the front of
|
| -one of their object names and then done an MD5 hash of the name, and
|
| -then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object
|
| -name. Note that there will be no problems with object name length since we
|
| -hash the original name.
|
| -"""
|
| -
|
| -# In order to prevent people from uploading thousands of tiny files in parallel
|
| -# (which, apart from being useless, is likely to cause them to be throttled
|
| -# for the compose calls), don't allow files smaller than this to use parallel
|
| -# composite uploads.
|
| -MIN_PARALLEL_COMPOSITE_FILE_SIZE = 20971520 # 20 MB
|
| -
|
| SYNOPSIS_TEXT = """
|
| <B>SYNOPSIS</B>
|
| - gsutil cp [OPTION]... src_uri dst_uri
|
| - gsutil cp [OPTION]... src_uri... dst_uri
|
| - gsutil cp [OPTION]... -I dst_uri
|
| + gsutil cp [OPTION]... src_url dst_url
|
| + gsutil cp [OPTION]... src_url... dst_url
|
| + gsutil cp [OPTION]... -I dst_url
|
| """
|
|
|
| DESCRIPTION_TEXT = """
|
| @@ -186,14 +72,19 @@
|
|
|
| gsutil -m cp -R dir gs://my_bucket
|
|
|
| - You can pass a list of URIs to copy on STDIN instead of as command line
|
| - arguments by using the -I option. This allows you to use gsutil in a
|
| - pipeline to copy files and objects as generated by a program, such as:
|
| + You can pass a list of URLs (one per line) to copy on STDIN instead of as
|
| + command line arguments by using the -I option. This allows you to use gsutil
|
| + in a pipeline to upload or download files / objects as generated by a program,
|
| + such as:
|
|
|
| some_program | gsutil -m cp -I gs://my_bucket
|
|
|
| - The contents of STDIN can name files, cloud URIs, and wildcards of files
|
| - and cloud URIs.
|
| + or:
|
| +
|
| + some_program | gsutil -m cp -I ./download_dir
|
| +
|
| + The contents of STDIN can name files, cloud URLs, and wildcards of files
|
| + and cloud URLs.
|
| """
|
|
|
| NAME_CONSTRUCTION_TEXT = """
|
| @@ -237,6 +128,15 @@
|
| will create objects named like gs://my_bucket/subdir/dir2/a/b/c. In contrast,
|
| if gs://my_bucket/subdir does not exist, this same gsutil cp command will
|
| create objects named like gs://my_bucket/subdir/a/b/c.
|
| +
|
| + Note: If you use the
|
| + `Google Developers Console <https://console.developers.google.com>`_
|
| + to create folders, it does so by creating a "placeholder" object that ends
|
| + with a "/" character. gsutil skips these objects when downloading from the
|
| + cloud to the local file system, because attempting to create a file that
|
| + ends with a "/" is not allowed on Linux and MacOS. Because of this, it is
|
| + recommended that you not create objects that end with "/" (unless you don't
|
| + need to be able to download such objects using gsutil).
|
| """
|
|
|
| SUBDIRECTORIES_TEXT = """
|
| @@ -282,11 +182,11 @@
|
|
|
| COPY_IN_CLOUD_TEXT = """
|
| <B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B>
|
| - If both the source and destination URI are cloud URIs from the same
|
| + If both the source and destination URL are cloud URLs from the same
|
| provider, gsutil copies data "in the cloud" (i.e., without downloading
|
| to and uploading from the machine where you run gsutil). In addition to
|
| the performance and cost advantages of doing this, copying in the cloud
|
| - preserves metadata (like Content-Type and Cache-Control). In contrast,
|
| + preserves metadata (like Content-Type and Cache-Control). In contrast,
|
| when you download data from the cloud it ends up in a file, which has
|
| no associated metadata. Thus, unless you have some way to hold on to
|
| or re-create that metadata, downloading to a file will not retain the
|
| @@ -296,20 +196,63 @@
|
| ACL to the new object, and instead will use the default bucket ACL (see
|
| "gsutil help defacl"). You can override this behavior with the -p
|
| option (see OPTIONS below).
|
| +
|
| + One additional note about copying in the cloud: If the destination bucket has
|
| + versioning enabled, gsutil cp will copy all versions of the source object(s).
|
| + For example:
|
| +
|
| + gsutil cp gs://bucket1/obj gs://bucket2
|
| +
|
| + will cause all versions of gs://bucket1/obj to be copied to gs://bucket2.
|
| """
|
|
|
| +FAILURE_HANDLING_TEXT = """
|
| +<B>CHECKSUM VALIDATION AND FAILURE HANDLING</B>
|
| + At the end of every upload or download, the gsutil cp command validates that
|
| + that the checksum of the source file/object matches the checksum of the
|
| + destination file/object. If the checksums do not match, gsutil will delete
|
| + the invalid copy and print a warning message. This very rarely happens, but
|
| + if it does, please contact gs-team@google.com.
|
| +
|
| + The cp command will retry when failures occur, but if enough failures happen
|
| + during a particular copy or delete operation the command will skip that object
|
| + and move on. At the end of the copy run if any failures were not successfully
|
| + retried, the cp command will report the count of failures, and exit with
|
| + non-zero status.
|
| +
|
| + Note that there are cases where retrying will never succeed, such as if you
|
| + don't have write permission to the destination bucket or if the destination
|
| + path for some objects is longer than the maximum allowed length.
|
| +
|
| + For more details about gsutil's retry handling, please see
|
| + "gsutil help retries".
|
| +"""
|
| +
|
| RESUMABLE_TRANSFERS_TEXT = """
|
| <B>RESUMABLE TRANSFERS</B>
|
| - gsutil automatically uses the Google Cloud Storage resumable upload
|
| - feature whenever you use the cp command to upload an object that is larger
|
| - than 2 MB. You do not need to specify any special command line options
|
| - to make this happen. If your upload is interrupted you can restart the
|
| - upload by running the same cp command that you ran to start the upload.
|
| + gsutil automatically uses the Google Cloud Storage resumable upload feature
|
| + whenever you use the cp command to upload an object that is larger than 2
|
| + MB. You do not need to specify any special command line options to make this
|
| + happen. If your upload is interrupted you can restart the upload by running
|
| + the same cp command that you ran to start the upload. Until the upload
|
| + has completed successfully, it will not be visible at the destination object
|
| + and will not replace any existing object the upload is intended to overwrite.
|
| + (However, see the section on PARALLEL COMPOSITE UPLOADS, which may leave
|
| + temporary component objects in place during the upload process.)
|
|
|
| Similarly, gsutil automatically performs resumable downloads (using HTTP
|
| standard Range GET operations) whenever you use the cp command to download an
|
| - object larger than 2 MB.
|
| + object larger than 2 MB. In this case the partially downloaded file will be
|
| + visible as soon as it starts being written. Thus, before you attempt to use
|
| + any files downloaded by gsutil you should make sure the download completed
|
| + successfully, by checking the exit status from the gsutil command. This can
|
| + be done in a bash script, for example, by doing:
|
|
|
| + gsutil cp gs://your-bucket/your-object ./local-file
|
| + if [ "$status" -ne "0" ] ; then
|
| + << Code that handles failures >>
|
| + fi
|
| +
|
| Resumable uploads and downloads store some state information in a file
|
| in ~/.gsutil named by the destination object or file. If you attempt to
|
| resume a transfer from a machine with a different directory, the transfer
|
| @@ -321,7 +264,7 @@
|
|
|
| STREAMING_TRANSFERS_TEXT = """
|
| <B>STREAMING TRANSFERS</B>
|
| - Use '-' in place of src_uri or dst_uri to perform a streaming
|
| + Use '-' in place of src_url or dst_url to perform a streaming
|
| transfer. For example:
|
|
|
| long_running_computation | gsutil cp - gs://my_bucket/obj
|
| @@ -333,20 +276,29 @@
|
|
|
| PARALLEL_COMPOSITE_UPLOADS_TEXT = """
|
| <B>PARALLEL COMPOSITE UPLOADS</B>
|
| - gsutil automatically uses `object composition <https://developers.google.com/storage/docs/composite-objects>`_
|
| - to perform uploads in parallel for large, local files being uploaded to
|
| - Google Cloud Storage. This means that, by default, a large file will be split
|
| - into component pieces that will be uploaded in parallel. Those components will
|
| - then be composed in the cloud, and the temporary components in the cloud will
|
| - be deleted after successful composition. No additional local disk space is
|
| - required for this operation.
|
| + gsutil can automatically use
|
| + `object composition <https://developers.google.com/storage/docs/composite-objects>`_
|
| + to perform uploads in parallel for large, local files being uploaded to Google
|
| + Cloud Storage. This means that, if enabled (see next paragraph), a large file
|
| + will be split into component pieces that will be uploaded in parallel. Those
|
| + components will then be composed in the cloud, and the temporary components in
|
| + the cloud will be deleted after successful composition. No additional local
|
| + disk space is required for this operation.
|
|
|
| - Any file whose size exceeds the "parallel_composite_upload_threshold" config
|
| - variable will trigger this feature by default. The ideal size of a
|
| - component can also be set with the "parallel_composite_upload_component_size"
|
| - config variable. See the .boto config file for details about how these values
|
| - are used.
|
| + If the "parallel_composite_upload_threshold" config value is not 0 (which
|
| + disbles the feature), any file whose size exceeds the specified size will
|
| + trigger a parallel composite upload. Note that at present parallel composite
|
| + uploads are disabled by default, because using composite objects requires a
|
| + compiled crcmod (see "gsutil help crcmod"), and for operating systems that
|
| + don't already have this package installed this makes gsutil harder to use.
|
| + Google is actively working with a number of the Linux distributions to get
|
| + crcmod included with the stock distribution. Once that is done we will
|
| + re-enable parallel composite uploads by default in gsutil.
|
|
|
| + The ideal size of a component can also be set with the
|
| + "parallel_composite_upload_component_size" config variable. See the comments
|
| + in the .boto config file for details about how these values are used.
|
| +
|
| If the transfer fails prior to composition, running the command again will
|
| take advantage of resumable uploads for those components that failed, and
|
| the component objects will be deleted after the first successful attempt.
|
| @@ -357,6 +309,22 @@
|
| where <random ID> is some numerical value, and <hash> is an MD5 hash (not
|
| related to the hash of the contents of the file or object).
|
|
|
| + To avoid leaving temporary objects around, you should make sure to check the
|
| + exit status from the gsutil command. This can be done in a bash script, for
|
| + example, by doing:
|
| +
|
| + gsutil cp ./local-file gs://your-bucket/your-object
|
| + if [ "$status" -ne "0" ] ; then
|
| + << Code that handles failures >>
|
| + fi
|
| +
|
| + Or, for copying a directory, use this instead:
|
| +
|
| + gsutil cp -c -L cp.log -R ./dir gs://bucket
|
| + if [ "$status" -ne "0" ] ; then
|
| + << Code that handles failures >>
|
| + fi
|
| +
|
| One important caveat is that files uploaded in this fashion are still subject
|
| to the maximum number of components limit. For example, if you upload a large
|
| file that gets split into %d components, and try to compose it with another
|
| @@ -373,6 +341,7 @@
|
| """ % (PARALLEL_UPLOAD_TEMP_NAMESPACE, 10, MAX_COMPONENT_COUNT - 9,
|
| MAX_COMPONENT_COUNT)
|
|
|
| +
|
| CHANGING_TEMP_DIRECTORIES_TEXT = """
|
| <B>CHANGING TEMP DIRECTORIES</B>
|
| gsutil writes data to a temporary directory in several cases:
|
| @@ -405,214 +374,191 @@
|
|
|
| OPTIONS_TEXT = """
|
| <B>OPTIONS</B>
|
| - -a canned_acl Sets named canned_acl when uploaded objects created. See
|
| - 'gsutil help acls' for further details.
|
| + -a canned_acl Sets named canned_acl when uploaded objects created. See
|
| + 'gsutil help acls' for further details.
|
|
|
| - -c If an error occurrs, continue to attempt to copy the remaining
|
| - files. Note that this option is always true when running
|
| - "gsutil -m cp".
|
| + -c If an error occurrs, continue to attempt to copy the remaining
|
| + files. If any copies were unsuccessful, gsutil's exit status
|
| + will be non-zero even if this flag is set. This option is
|
| + implicitly set when running "gsutil -m cp...". Note: -c only
|
| + applies to the actual copying operation. If an error occurs
|
| + while iterating over the files in the local directory (e.g.,
|
| + invalid Unicode file name) gsutil will print an error message
|
| + and abort.
|
|
|
| - -D Copy in "daisy chain" mode, i.e., copying between two buckets by
|
| - hooking a download to an upload, via the machine where gsutil is
|
| - run. By default, data are copied between two buckets
|
| - "in the cloud", i.e., without needing to copy via the machine
|
| - where gsutil runs.
|
| + -D Copy in "daisy chain" mode, i.e., copying between two buckets
|
| + by hooking a download to an upload, via the machine where
|
| + gsutil is run. By default, data are copied between two buckets
|
| + "in the cloud", i.e., without needing to copy via the machine
|
| + where gsutil runs.
|
|
|
| - By default, a "copy in the cloud" when the source is a composite
|
| - object will retain the composite nature of the object. However,
|
| - Daisy chain mode can be used to change a composite object into
|
| - a non-composite object. For example:
|
| + By default, a "copy in the cloud" when the source is a
|
| + composite object will retain the composite nature of the
|
| + object. However, Daisy chain mode can be used to change a
|
| + composite object into a non-composite object. For example:
|
|
|
| - gsutil cp -D -p gs://bucket/obj gs://bucket/obj_tmp
|
| - gsutil mv -p gs://bucket/obj_tmp gs://bucket/obj
|
| + gsutil cp -D -p gs://bucket/obj gs://bucket/obj_tmp
|
| + gsutil mv -p gs://bucket/obj_tmp gs://bucket/obj
|
|
|
| - Note: Daisy chain mode is automatically used when copying
|
| - between providers (e.g., to copy data from Google Cloud Storage
|
| - to another provider).
|
| + Note: Daisy chain mode is automatically used when copying
|
| + between providers (e.g., to copy data from Google Cloud Storage
|
| + to another provider).
|
|
|
| - -e Exclude symlinks. When specified, symbolic links will not be
|
| - copied.
|
| + -e Exclude symlinks. When specified, symbolic links will not be
|
| + copied.
|
|
|
| - -L <file> Outputs a manifest log file with detailed information about each
|
| - item that was copied. This manifest contains the following
|
| - information for each item:
|
| + -I Causes gsutil to read the list of files or objects to copy from
|
| + stdin. This allows you to run a program that generates the list
|
| + of files to upload/download.
|
|
|
| - - Source path.
|
| - - Destination path.
|
| - - Source size.
|
| - - Bytes transferred.
|
| - - MD5 hash.
|
| - - UTC date and time transfer was started in ISO 8601 format.
|
| - - UTC date and time transfer was completed in ISO 8601 format.
|
| - - Upload id, if a resumable upload was performed.
|
| - - Final result of the attempted transfer, success or failure.
|
| - - Failure details, if any.
|
| + -L <file> Outputs a manifest log file with detailed information about
|
| + each item that was copied. This manifest contains the following
|
| + information for each item:
|
|
|
| - If the log file already exists, gsutil will use the file as an
|
| - input to the copy process, and will also append log items to the
|
| - existing file. Files/objects that are marked in the existing log
|
| - file as having been successfully copied (or skipped) will be
|
| - ignored. Files/objects without entries will be copied and ones
|
| - previously marked as unsuccessful will be retried. This can be
|
| - used in conjunction with the -c option to build a script that
|
| - copies a large number of objects reliably, using a bash script
|
| - like the following:
|
| + - Source path.
|
| + - Destination path.
|
| + - Source size.
|
| + - Bytes transferred.
|
| + - MD5 hash.
|
| + - UTC date and time transfer was started in ISO 8601 format.
|
| + - UTC date and time transfer was completed in ISO 8601 format.
|
| + - Upload id, if a resumable upload was performed.
|
| + - Final result of the attempted transfer, success or failure.
|
| + - Failure details, if any.
|
|
|
| - status=1
|
| - while [ $status -ne 0 ] ; do
|
| - gsutil cp -c -L cp.log -R ./dir gs://bucket
|
| - status=$?
|
| - done
|
| + If the log file already exists, gsutil will use the file as an
|
| + input to the copy process, and will also append log items to
|
| + the existing file. Files/objects that are marked in the
|
| + existing log file as having been successfully copied (or
|
| + skipped) will be ignored. Files/objects without entries will be
|
| + copied and ones previously marked as unsuccessful will be
|
| + retried. This can be used in conjunction with the -c option to
|
| + build a script that copies a large number of objects reliably,
|
| + using a bash script like the following:
|
|
|
| - The -c option will cause copying to continue after failures
|
| - occur, and the -L option will allow gsutil to pick up where it
|
| - left off without duplicating work. The loop will continue
|
| - running as long as gsutil exits with a non-zero status (such a
|
| - status indicates there was at least one failure during the
|
| - gsutil run).
|
| + until gsutil cp -c -L cp.log -R ./dir gs://bucket; do
|
| + sleep 1
|
| + done
|
|
|
| - -n No-clobber. When specified, existing files or objects at the
|
| - destination will not be overwritten. Any items that are skipped
|
| - by this option will be reported as being skipped. This option
|
| - will perform an additional HEAD request to check if an item
|
| - exists before attempting to upload the data. This will save
|
| - retransmitting data, but the additional HTTP requests may make
|
| - small object transfers slower and more expensive.
|
| + The -c option will cause copying to continue after failures
|
| + occur, and the -L option will allow gsutil to pick up where it
|
| + left off without duplicating work. The loop will continue
|
| + running as long as gsutil exits with a non-zero status (such a
|
| + status indicates there was at least one failure during the
|
| + gsutil run).
|
|
|
| - -p Causes source ACLs to be preserved when copying in the cloud.
|
| - Note that this option has performance and cost implications,
|
| - because it is essentially performing three requests ('acl get',
|
| - cp, 'acl set'). (The performance issue can be mitigated to some
|
| - degree by using gsutil -m cp to cause parallel copying.)
|
| + Note: If you're trying to synchronize the contents of a
|
| + directory and a bucket (or two buckets), see
|
| + 'gsutil help rsync'.
|
|
|
| - You can avoid the additional performance and cost of using cp -p
|
| - if you want all objects in the destination bucket to end up with
|
| - the same ACL by setting a default ACL on that bucket instead of
|
| - using cp -p. See "help gsutil defacl".
|
| + -n No-clobber. When specified, existing files or objects at the
|
| + destination will not be overwritten. Any items that are skipped
|
| + by this option will be reported as being skipped. This option
|
| + will perform an additional GET request to check if an item
|
| + exists before attempting to upload the data. This will save
|
| + retransmitting data, but the additional HTTP requests may make
|
| + small object transfers slower and more expensive.
|
|
|
| - Note that it's not valid to specify both the -a and -p options
|
| - together.
|
| + -p Causes ACLs to be preserved when copying in the cloud. Note
|
| + that this option has performance and cost implications when
|
| + using the XML API, as it requires separate HTTP calls for
|
| + interacting with ACLs. The performance issue can be mitigated
|
| + to some degree by using gsutil -m cp to cause parallel copying.
|
| + Also, this option only works if you have OWNER access to all of
|
| + the objects that are copied.
|
|
|
| - -q Deprecated. Please use gsutil -q cp ... instead.
|
| + You can avoid the additional performance and cost of using
|
| + cp -p if you want all objects in the destination bucket to end
|
| + up with the same ACL by setting a default object ACL on that
|
| + bucket instead of using cp -p. See "help gsutil defacl".
|
|
|
| - -R, -r Causes directories, buckets, and bucket subdirectories to be
|
| - copied recursively. If you neglect to use this option for
|
| - an upload, gsutil will copy any files it finds and skip any
|
| - directories. Similarly, neglecting to specify -R for a download
|
| - will cause gsutil to copy any objects at the current bucket
|
| - directory level, and skip any subdirectories.
|
| + Note that it's not valid to specify both the -a and -p options
|
| + together.
|
|
|
| - -v Requests that the version-specific URI for each uploaded object
|
| - be printed. Given this URI you can make future upload requests
|
| - that are safe in the face of concurrent updates, because Google
|
| - Cloud Storage will refuse to perform the update if the current
|
| - object version doesn't match the version-specific URI. See
|
| - 'gsutil help versions' for more details.
|
| + -R, -r Causes directories, buckets, and bucket subdirectories to be
|
| + copied recursively. If you neglect to use this option for
|
| + an upload, gsutil will copy any files it finds and skip any
|
| + directories. Similarly, neglecting to specify -R for a download
|
| + will cause gsutil to copy any objects at the current bucket
|
| + directory level, and skip any subdirectories.
|
|
|
| - -z <ext,...> Applies gzip content-encoding to file uploads with the given
|
| - extensions. This is useful when uploading files with
|
| - compressible content (such as .js, .css, or .html files) because
|
| - it saves network bandwidth and space in Google Cloud Storage,
|
| - which in turn reduces storage costs.
|
| + -v Requests that the version-specific URL for each uploaded object
|
| + be printed. Given this URL you can make future upload requests
|
| + that are safe in the face of concurrent updates, because Google
|
| + Cloud Storage will refuse to perform the update if the current
|
| + object version doesn't match the version-specific URL. See
|
| + 'gsutil help versions' for more details.
|
|
|
| - When you specify the -z option, the data from your files is
|
| - compressed before it is uploaded, but your actual files are left
|
| - uncompressed on the local disk. The uploaded objects retain the
|
| - Content-Type and name of the original files but are given a
|
| - Content-Encoding header with the value "gzip" to indicate that
|
| - the object data stored are compressed on the Google Cloud
|
| - Storage servers.
|
| + -z <ext,...> Applies gzip content-encoding to file uploads with the given
|
| + extensions. This is useful when uploading files with
|
| + compressible content (such as .js, .css, or .html files)
|
| + because it saves network bandwidth and space in Google Cloud
|
| + Storage, which in turn reduces storage costs.
|
|
|
| - For example, the following command:
|
| + When you specify the -z option, the data from your files is
|
| + compressed before it is uploaded, but your actual files are
|
| + left uncompressed on the local disk. The uploaded objects
|
| + retain the Content-Type and name of the original files but are
|
| + given a Content-Encoding header with the value "gzip" to
|
| + indicate that the object data stored are compressed on the
|
| + Google Cloud Storage servers.
|
|
|
| - gsutil cp -z html -a public-read cattypes.html gs://mycats
|
| + For example, the following command:
|
|
|
| - will do all of the following:
|
| + gsutil cp -z html -a public-read cattypes.html gs://mycats
|
|
|
| - - Upload as the object gs://mycats/cattypes.html (cp command)
|
| - - Set the Content-Type to text/html (based on file extension)
|
| - - Compress the data in the file cattypes.html (-z option)
|
| - - Set the Content-Encoding to gzip (-z option)
|
| - - Set the ACL to public-read (-a option)
|
| - - If a user tries to view cattypes.html in a browser, the
|
| - browser will know to uncompress the data based on the
|
| - Content-Encoding header, and to render it as HTML based on
|
| - the Content-Type header.
|
| + will do all of the following:
|
| +
|
| + - Upload as the object gs://mycats/cattypes.html (cp command)
|
| + - Set the Content-Type to text/html (based on file extension)
|
| + - Compress the data in the file cattypes.html (-z option)
|
| + - Set the Content-Encoding to gzip (-z option)
|
| + - Set the ACL to public-read (-a option)
|
| + - If a user tries to view cattypes.html in a browser, the
|
| + browser will know to uncompress the data based on the
|
| + Content-Encoding header, and to render it as HTML based on
|
| + the Content-Type header.
|
| +
|
| + Note that if you download an object with Content-Encoding:gzip
|
| + gsutil will decompress the content before writing the local
|
| + file.
|
| """
|
|
|
| -_detailed_help_text = '\n\n'.join([SYNOPSIS_TEXT,
|
| +_DETAILED_HELP_TEXT = '\n\n'.join([SYNOPSIS_TEXT,
|
| DESCRIPTION_TEXT,
|
| NAME_CONSTRUCTION_TEXT,
|
| SUBDIRECTORIES_TEXT,
|
| COPY_IN_CLOUD_TEXT,
|
| + FAILURE_HANDLING_TEXT,
|
| RESUMABLE_TRANSFERS_TEXT,
|
| STREAMING_TRANSFERS_TEXT,
|
| PARALLEL_COMPOSITE_UPLOADS_TEXT,
|
| CHANGING_TEMP_DIRECTORIES_TEXT,
|
| OPTIONS_TEXT])
|
|
|
| -# This tuple is used only to encapsulate the arguments needed for
|
| -# _PerformResumableUploadIfApplies, so that the arguments fit the model of
|
| -# command.Apply().
|
| -PerformResumableUploadIfAppliesArgs = namedtuple(
|
| - 'PerformResumableUploadIfAppliesArgs',
|
| - 'filename file_start file_length src_uri dst_uri canned_acl headers '
|
| - 'tracker_file tracker_file_lock')
|
|
|
| -ObjectFromTracker = namedtuple('ObjectFromTracker',
|
| - 'object_name generation')
|
| +CP_SUB_ARGS = 'a:cDeIL:MNnprRtvz:'
|
|
|
| -CP_SUB_ARGS = 'a:cDeIL:MNnpqrRtvz:'
|
|
|
| -# The maximum length of a file name can vary wildly between different
|
| -# operating systems, so we always ensure that tracker files are less
|
| -# than 100 characters in order to avoid any such issues.
|
| -MAX_TRACKER_FILE_NAME_LENGTH = 100
|
| +def _CopyFuncWrapper(cls, args, thread_state=None):
|
| + cls.CopyFunc(args, thread_state=thread_state)
|
|
|
|
|
| -class TrackerFileType(object):
|
| - UPLOAD = 1
|
| - DOWNLOAD = 2
|
| - PARALLEL_UPLOAD = 3
|
| -
|
| -def _CopyFuncWrapper(cls, args):
|
| - cls._CopyFunc(args)
|
| -
|
| -def _PerformResumableUploadIfAppliesWrapper(cls, args):
|
| - """A wrapper for cp._PerformResumableUploadIfApplies, which takes in a
|
| - PerformResumableUploadIfAppliesArgs, extracts the arguments to form the
|
| - arguments for the wrapped function, and then calls the wrapped function.
|
| - This was designed specifically for use with command.Apply().
|
| - """
|
| - fp = FilePart(args.filename, args.file_start, args.file_length)
|
| - with fp:
|
| - already_split = True
|
| - ret = cls._PerformResumableUploadIfApplies(
|
| - fp, args.src_uri, args.dst_uri, args.canned_acl, args.headers,
|
| - fp.length, already_split)
|
| -
|
| - # Update the tracker file after each call in order to be as robust as possible
|
| - # against interrupts, failures, etc.
|
| - component = ret[2]
|
| - _AppendComponentTrackerToParallelUploadTrackerFile(args.tracker_file,
|
| - component,
|
| - args.tracker_file_lock)
|
| - return ret
|
| -
|
| def _CopyExceptionHandler(cls, e):
|
| """Simple exception handler to allow post-completion status."""
|
| cls.logger.error(str(e))
|
| - cls.copy_failure_count += 1
|
| - cls.logger.debug(('\n\nEncountered exception while copying:\n%s\n' %
|
| - traceback.format_exc()))
|
| + cls.op_failure_count += 1
|
| + cls.logger.debug('\n\nEncountered exception while copying:\n%s\n',
|
| + traceback.format_exc())
|
|
|
| +
|
| def _RmExceptionHandler(cls, e):
|
| """Simple exception handler to allow post-completion status."""
|
| cls.logger.error(str(e))
|
|
|
| +
|
| class CpCommand(Command):
|
| - """
|
| - Implementation of gsutil cp command.
|
| + """Implementation of gsutil cp command.
|
|
|
| Note that CpCommand is run for both gsutil cp and gsutil mv. The latter
|
| happens by MvCommand calling CpCommand and passing the hidden (undocumented)
|
| @@ -631,1648 +577,243 @@
|
| of processing the bucket listing iterator on the fly).
|
| """
|
|
|
| - # TODO: Refactor this file to be less cumbersome. In particular, some of the
|
| - # different paths (e.g., uploading a file to an object vs. downloading an
|
| - # object to a file) could be split into separate files.
|
| + # Command specification. See base class for documentation.
|
| + command_spec = Command.CreateCommandSpec(
|
| + 'cp',
|
| + command_name_aliases=['copy'],
|
| + min_args=1,
|
| + max_args=NO_MAX,
|
| + # -t is deprecated but leave intact for now to avoid breakage.
|
| + supported_sub_args=CP_SUB_ARGS,
|
| + file_url_ok=True,
|
| + provider_url_ok=False,
|
| + urls_start_arg=0,
|
| + gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
|
| + gs_default_api=ApiSelector.JSON,
|
| + supported_private_args=['haltatbyte='],
|
| + )
|
| + # Help specification. See help_provider.py for documentation.
|
| + help_spec = Command.HelpSpec(
|
| + help_name='cp',
|
| + help_name_aliases=['copy'],
|
| + help_type='command_help',
|
| + help_one_line_summary='Copy files and objects',
|
| + help_text=_DETAILED_HELP_TEXT,
|
| + subcommand_help_text={},
|
| + )
|
|
|
| - # Set default Content-Type type.
|
| - DEFAULT_CONTENT_TYPE = 'application/octet-stream'
|
| - USE_MAGICFILE = boto.config.getbool('GSUtil', 'use_magicfile', False)
|
| - # Chunk size to use while unzipping gzip files.
|
| - GUNZIP_CHUNK_SIZE = 8192
|
| -
|
| - # Command specification (processed by parent class).
|
| - command_spec = {
|
| - # Name of command.
|
| - COMMAND_NAME : 'cp',
|
| - # List of command name aliases.
|
| - COMMAND_NAME_ALIASES : ['copy'],
|
| - # Min number of args required by this command.
|
| - MIN_ARGS : 1,
|
| - # Max number of args required by this command, or NO_MAX.
|
| - MAX_ARGS : NO_MAX,
|
| - # Getopt-style string specifying acceptable sub args.
|
| - # -t is deprecated but leave intact for now to avoid breakage.
|
| - SUPPORTED_SUB_ARGS : CP_SUB_ARGS,
|
| - # True if file URIs acceptable for this command.
|
| - FILE_URIS_OK : True,
|
| - # True if provider-only URIs acceptable for this command.
|
| - PROVIDER_URIS_OK : False,
|
| - # Index in args of first URI arg.
|
| - URIS_START_ARG : 0,
|
| - }
|
| - help_spec = {
|
| - # Name of command or auxiliary help info for which this help applies.
|
| - HELP_NAME : 'cp',
|
| - # List of help name aliases.
|
| - HELP_NAME_ALIASES : ['copy'],
|
| - # Type of help:
|
| - HELP_TYPE : HelpType.COMMAND_HELP,
|
| - # One line summary of this help.
|
| - HELP_ONE_LINE_SUMMARY : 'Copy files and objects',
|
| - # The full help text.
|
| - HELP_TEXT : _detailed_help_text,
|
| - }
|
| -
|
| - def _GetMD5FromETag(self, key):
|
| - if not key.etag:
|
| - return None
|
| - possible_md5 = key.etag.strip('"\'').lower()
|
| - if re.match(r'^[0-9a-f]{32}$', possible_md5):
|
| - return binascii.a2b_hex(possible_md5)
|
| -
|
| - def _CheckHashes(self, key, file_name, hash_algs_to_compute,
|
| - computed_hashes=None):
|
| - """Validates integrity by comparing cloud digest to local digest.
|
| -
|
| - Args:
|
| - key: Instance of boto Key object.
|
| - file_name: Name of downloaded file on local disk.
|
| - hash_algs_to_compute: Dictionary mapping hash algorithm names to digester
|
| - objects.
|
| - computed_hashes: If specified, use this dictionary mapping hash algorithm
|
| - names to the calculated digest. If not specified, the
|
| - key argument will be checked for local_digests property.
|
| - If neither exist, the local file will be opened and
|
| - digests calculated on-demand.
|
| -
|
| - Raises:
|
| - CommandException: if cloud digests don't match local digests.
|
| - """
|
| - cloud_hashes = {}
|
| - if hasattr(key, 'cloud_hashes'):
|
| - cloud_hashes = key.cloud_hashes
|
| - # Check for older-style MD5-based etag.
|
| - etag_md5 = self._GetMD5FromETag(key)
|
| - if etag_md5:
|
| - cloud_hashes.setdefault('md5', etag_md5)
|
| -
|
| - local_hashes = {}
|
| - # If we've already computed a valid local hash, use that, else calculate an
|
| - # md5 or crc32c depending on what we have available to compare against.
|
| - if computed_hashes:
|
| - local_hashes = computed_hashes
|
| - elif hasattr(key, 'local_hashes') and key.local_hashes:
|
| - local_hashes = key.local_hashes
|
| - elif 'md5' in cloud_hashes and 'md5' in hash_algs_to_compute:
|
| - self.logger.info(
|
| - 'Computing MD5 from scratch for resumed download')
|
| -
|
| - # Open file in binary mode to avoid surprises in Windows.
|
| - with open(file_name, 'rb') as fp:
|
| - local_hashes['md5'] = binascii.a2b_hex(key.compute_md5(fp)[0])
|
| - elif 'crc32c' in cloud_hashes and 'crc32c' in hash_algs_to_compute:
|
| - self.logger.info(
|
| - 'Computing CRC32C from scratch for resumed download')
|
| -
|
| - # Open file in binary mode to avoid surprises in Windows.
|
| - with open(file_name, 'rb') as fp:
|
| - crc32c_alg = lambda: crcmod.predefined.Crc('crc-32c')
|
| - crc32c_hex = key.compute_hash(
|
| - fp, algorithm=crc32c_alg)[0]
|
| - local_hashes['crc32c'] = binascii.a2b_hex(crc32c_hex)
|
| -
|
| - for alg in local_hashes:
|
| - if alg not in cloud_hashes:
|
| - continue
|
| - local_hexdigest = binascii.b2a_hex(local_hashes[alg])
|
| - cloud_hexdigest = binascii.b2a_hex(cloud_hashes[alg])
|
| - self.logger.debug('Comparing local vs cloud %s-checksum. (%s/%s)' % (
|
| - alg, local_hexdigest, cloud_hexdigest))
|
| - if local_hexdigest != cloud_hexdigest:
|
| - raise CommandException(
|
| - '%s signature computed for local file (%s) doesn\'t match '
|
| - 'cloud-supplied digest (%s). Local file (%s) deleted.' % (
|
| - alg, local_hexdigest, cloud_hexdigest, file_name))
|
| -
|
| - def _CheckForDirFileConflict(self, exp_src_uri, dst_uri):
|
| - """Checks whether copying exp_src_uri into dst_uri is not possible.
|
| -
|
| - This happens if a directory exists in local file system where a file
|
| - needs to go or vice versa. In that case we print an error message and
|
| - exits. Example: if the file "./x" exists and you try to do:
|
| - gsutil cp gs://mybucket/x/y .
|
| - the request can't succeed because it requires a directory where
|
| - the file x exists.
|
| -
|
| - Note that we don't enforce any corresponding restrictions for buckets,
|
| - because the flat namespace semantics for buckets doesn't prohibit such
|
| - cases the way hierarchical file systems do. For example, if a bucket
|
| - contains an object called gs://bucket/dir and then you run the command:
|
| - gsutil cp file1 file2 gs://bucket/dir
|
| - you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and
|
| - gs://bucket/dir/file2.
|
| -
|
| - Args:
|
| - exp_src_uri: Expanded source StorageUri of copy.
|
| - dst_uri: Destination URI.
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - if dst_uri.is_cloud_uri():
|
| - # The problem can only happen for file destination URIs.
|
| - return
|
| - dst_path = dst_uri.object_name
|
| - final_dir = os.path.dirname(dst_path)
|
| - if os.path.isfile(final_dir):
|
| - raise CommandException('Cannot retrieve %s because a file exists '
|
| - 'where a directory needs to be created (%s).' %
|
| - (exp_src_uri, final_dir))
|
| - if os.path.isdir(dst_path):
|
| - raise CommandException('Cannot retrieve %s because a directory exists '
|
| - '(%s) where the file needs to be created.' %
|
| - (exp_src_uri, dst_path))
|
| -
|
| - def _InsistDstUriNamesContainer(self, exp_dst_uri,
|
| - have_existing_dst_container, command_name):
|
| - """
|
| - Raises an exception if URI doesn't name a directory, bucket, or bucket
|
| - subdir, with special exception for cp -R (see comments below).
|
| -
|
| - Args:
|
| - exp_dst_uri: Wildcard-expanding dst_uri.
|
| - have_existing_dst_container: bool indicator of whether exp_dst_uri
|
| - names a container (directory, bucket, or existing bucket subdir).
|
| - command_name: Name of command making call. May not be the same as
|
| - self.command_name in the case of commands implemented atop other
|
| - commands (like mv command).
|
| -
|
| - Raises:
|
| - CommandException: if the URI being checked does not name a container.
|
| - """
|
| - if exp_dst_uri.is_file_uri():
|
| - ok = exp_dst_uri.names_directory()
|
| - else:
|
| - if have_existing_dst_container:
|
| - ok = True
|
| - else:
|
| - # It's ok to specify a non-existing bucket subdir, for example:
|
| - # gsutil cp -R dir gs://bucket/abc
|
| - # where gs://bucket/abc isn't an existing subdir.
|
| - ok = exp_dst_uri.names_object()
|
| - if not ok:
|
| - raise CommandException('Destination URI must name a directory, bucket, '
|
| - 'or bucket\nsubdirectory for the multiple '
|
| - 'source form of the %s command.' % command_name)
|
| -
|
| - class _FileCopyCallbackHandler(object):
|
| - """Outputs progress info for large copy requests."""
|
| -
|
| - def __init__(self, upload, logger):
|
| - if upload:
|
| - self.announce_text = 'Uploading'
|
| - else:
|
| - self.announce_text = 'Downloading'
|
| - self.logger = logger
|
| -
|
| - def call(self, total_bytes_transferred, total_size):
|
| - # Use sys.stderr.write instead of self.logger.info so progress messages
|
| - # output on a single continuously overwriting line.
|
| - if self.logger.isEnabledFor(logging.INFO):
|
| - sys.stderr.write('%s: %s/%s \r' % (
|
| - self.announce_text,
|
| - MakeHumanReadable(total_bytes_transferred),
|
| - MakeHumanReadable(total_size)))
|
| - if total_bytes_transferred == total_size:
|
| - sys.stderr.write('\n')
|
| -
|
| - class _StreamCopyCallbackHandler(object):
|
| - """Outputs progress info for Stream copy to cloud.
|
| - Total Size of the stream is not known, so we output
|
| - only the bytes transferred.
|
| - """
|
| -
|
| - def __init__(self, logger):
|
| - self.logger = logger
|
| -
|
| - def call(self, total_bytes_transferred, total_size):
|
| - # Use sys.stderr.write instead of self.logger.info so progress messages
|
| - # output on a single continuously overwriting line.
|
| - if self.logger.isEnabledFor(logging.INFO):
|
| - sys.stderr.write('Uploading: %s \r' %
|
| - MakeHumanReadable(total_bytes_transferred))
|
| - if total_size and total_bytes_transferred == total_size:
|
| - sys.stderr.write('\n')
|
| -
|
| - def _GetTransferHandlers(self, dst_uri, size, upload):
|
| - """
|
| - Selects upload/download and callback handlers.
|
| -
|
| - We use a callback handler that shows a simple textual progress indicator
|
| - if size is above the configurable threshold.
|
| -
|
| - We use a resumable transfer handler if size is >= the configurable
|
| - threshold and resumable transfers are supported by the given provider.
|
| - boto supports resumable downloads for all providers, but resumable
|
| - uploads are currently only supported by GS.
|
| -
|
| - Args:
|
| - dst_uri: the destination URI.
|
| - size: size of file (object) being uploaded (downloaded).
|
| - upload: bool indication of whether transfer is an upload.
|
| - """
|
| - config = boto.config
|
| - resumable_threshold = config.getint('GSUtil', 'resumable_threshold', TWO_MB)
|
| - transfer_handler = None
|
| - cb = None
|
| - num_cb = None
|
| -
|
| - # Checks whether the destination file is a "special" file, like /dev/null on
|
| - # Linux platforms or null on Windows platforms, so we can disable resumable
|
| - # download support since the file size of the destination won't ever be
|
| - # correct.
|
| - dst_is_special = False
|
| - if dst_uri.is_file_uri():
|
| - # Check explicitly first because os.stat doesn't work on 'nul' in Windows.
|
| - if dst_uri.object_name == os.devnull:
|
| - dst_is_special = True
|
| - try:
|
| - mode = os.stat(dst_uri.object_name).st_mode
|
| - if stat.S_ISCHR(mode):
|
| - dst_is_special = True
|
| - except OSError:
|
| - pass
|
| -
|
| - if size >= resumable_threshold and not dst_is_special:
|
| - cb = self._FileCopyCallbackHandler(upload, self.logger).call
|
| - num_cb = int(size / TWO_MB)
|
| -
|
| - if upload:
|
| - tracker_file_type = TrackerFileType.UPLOAD
|
| - else:
|
| - tracker_file_type = TrackerFileType.DOWNLOAD
|
| - tracker_file = self._GetTrackerFilePath(dst_uri, tracker_file_type)
|
| -
|
| - if upload:
|
| - if dst_uri.scheme == 'gs':
|
| - is_secure = BOTO_IS_SECURE
|
| - if not is_secure[0]:
|
| - self.logger.info('\n'.join(textwrap.wrap(
|
| - 'WARNING: Your boto config file (%s) has is_secure set to '
|
| - 'False. Resumable uploads are not secure when performed with '
|
| - 'this configuration, so large files are being uploaded with '
|
| - 'non-resumable uploads instead.' % GetConfigFilePath())))
|
| - else:
|
| - transfer_handler = ResumableUploadHandler(tracker_file)
|
| - else:
|
| - transfer_handler = ResumableDownloadHandler(tracker_file)
|
| -
|
| - return (cb, num_cb, transfer_handler)
|
| -
|
| - def _GetTrackerFilePath(self, dst_uri, tracker_file_type, src_uri=None):
|
| - resumable_tracker_dir = CreateTrackerDirIfNeeded()
|
| - if tracker_file_type == TrackerFileType.UPLOAD:
|
| - # Encode the dest bucket and object name into the tracker file name.
|
| - res_tracker_file_name = (
|
| - re.sub('[/\\\\]', '_', 'resumable_upload__%s__%s.url' %
|
| - (dst_uri.bucket_name, dst_uri.object_name)))
|
| - tracker_file_type_str = "upload"
|
| - elif tracker_file_type == TrackerFileType.DOWNLOAD:
|
| - # Encode the fully-qualified dest file name into the tracker file name.
|
| - res_tracker_file_name = (
|
| - re.sub('[/\\\\]', '_', 'resumable_download__%s.etag' %
|
| - (os.path.realpath(dst_uri.object_name))))
|
| - tracker_file_type_str = "download"
|
| - elif tracker_file_type == TrackerFileType.PARALLEL_UPLOAD:
|
| - # Encode the dest bucket and object names as well as the source file name
|
| - # into the tracker file name.
|
| - res_tracker_file_name = (
|
| - re.sub('[/\\\\]', '_', 'parallel_upload__%s__%s__%s.url' %
|
| - (dst_uri.bucket_name, dst_uri.object_name, src_uri)))
|
| - tracker_file_type_str = "parallel_upload"
|
| -
|
| - res_tracker_file_name = _HashFilename(res_tracker_file_name)
|
| - tracker_file_name = '%s_%s' % (tracker_file_type_str, res_tracker_file_name)
|
| - tracker_file_path = '%s%s%s' % (resumable_tracker_dir, os.sep,
|
| - tracker_file_name)
|
| - assert(len(tracker_file_name) < MAX_TRACKER_FILE_NAME_LENGTH)
|
| - return tracker_file_path
|
| -
|
| - def _LogCopyOperation(self, src_uri, dst_uri, headers):
|
| - """
|
| - Logs copy operation being performed, including Content-Type if appropriate.
|
| - """
|
| - if 'content-type' in headers and dst_uri.is_cloud_uri():
|
| - content_type_msg = ' [Content-Type=%s]' % headers['content-type']
|
| - else:
|
| - content_type_msg = ''
|
| - if src_uri.is_stream():
|
| - self.logger.info('Copying from <STDIN>%s...', content_type_msg)
|
| - else:
|
| - self.logger.info('Copying %s%s...', src_uri, content_type_msg)
|
| -
|
| - def _ProcessCopyObjectToObjectOptions(self, dst_uri, headers):
|
| - """
|
| - Common option processing between _CopyObjToObjInTheCloud and
|
| - _CopyObjToObjDaisyChainMode.
|
| - """
|
| - preserve_acl = False
|
| - canned_acl = None
|
| - if self.sub_opts:
|
| - for o, a in self.sub_opts:
|
| - if o == '-a':
|
| - canned_acls = dst_uri.canned_acls()
|
| - if a not in canned_acls:
|
| - raise CommandException('Invalid canned ACL "%s".' % a)
|
| - canned_acl = a
|
| - headers[dst_uri.get_provider().acl_header] = canned_acl
|
| - if o == '-p':
|
| - preserve_acl = True
|
| - if preserve_acl and canned_acl:
|
| - raise CommandException(
|
| - 'Specifying both the -p and -a options together is invalid.')
|
| - return (preserve_acl, canned_acl, headers)
|
| -
|
| - # We pass the headers explicitly to this call instead of using self.headers
|
| - # so we can set different metadata (like Content-Type type) for each object.
|
| - def _CopyObjToObjInTheCloud(self, src_key, src_uri, dst_uri, headers):
|
| - """Performs copy-in-the cloud from specified src to dest object.
|
| -
|
| - Args:
|
| - src_key: Source Key.
|
| - src_uri: Source StorageUri.
|
| - dst_uri: Destination StorageUri.
|
| - headers: A copy of the top-level headers dictionary.
|
| -
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, dst_uri) excluding overhead like initial
|
| - HEAD. Note: At present copy-in-the-cloud doesn't return the generation of
|
| - the created object, so the returned URI is actually not version-specific
|
| - (unlike other cp cases).
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - self._SetContentTypeHeader(src_uri, headers)
|
| - self._LogCopyOperation(src_uri, dst_uri, headers)
|
| - # Do Object -> object copy within same provider (uses
|
| - # x-<provider>-copy-source metadata HTTP header to request copying at the
|
| - # server).
|
| - src_bucket = src_uri.get_bucket(False, headers)
|
| - (preserve_acl, canned_acl, headers) = (
|
| - self._ProcessCopyObjectToObjectOptions(dst_uri, headers))
|
| - start_time = time.time()
|
| - # Pass headers in headers param not metadata param, so boto will copy
|
| - # existing key's metadata and just set the additional headers specified
|
| - # in the headers param (rather than using the headers to override existing
|
| - # metadata). In particular this allows us to copy the existing key's
|
| - # Content-Type and other metadata users need while still being able to
|
| - # set headers the API needs (like x-goog-project-id). Note that this means
|
| - # you can't do something like:
|
| - # gsutil cp -t Content-Type text/html gs://bucket/* gs://bucket2
|
| - # to change the Content-Type while copying.
|
| - dst_key = dst_uri.copy_key(
|
| - src_bucket.name, src_uri.object_name, preserve_acl=preserve_acl,
|
| - headers=headers, src_version_id=src_uri.version_id,
|
| - src_generation=src_uri.generation)
|
| -
|
| - end_time = time.time()
|
| - return (end_time - start_time, src_key.size,
|
| - dst_uri.clone_replace_key(dst_key))
|
| -
|
| - def _CheckFreeSpace(self, path):
|
| - """Return path/drive free space (in bytes)."""
|
| - if platform.system() == 'Windows':
|
| - from ctypes import c_int, c_uint64, c_wchar_p, windll, POINTER, WINFUNCTYPE, WinError
|
| - try:
|
| - GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_wchar_p, POINTER(c_uint64),
|
| - POINTER(c_uint64), POINTER(c_uint64))
|
| - GetDiskFreeSpaceEx = GetDiskFreeSpaceEx(
|
| - ('GetDiskFreeSpaceExW', windll.kernel32), (
|
| - (1, 'lpszPathName'),
|
| - (2, 'lpFreeUserSpace'),
|
| - (2, 'lpTotalSpace'),
|
| - (2, 'lpFreeSpace'),))
|
| - except AttributeError:
|
| - GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_char_p, POINTER(c_uint64),
|
| - POINTER(c_uint64), POINTER(c_uint64))
|
| - GetDiskFreeSpaceEx = GetDiskFreeSpaceEx(
|
| - ('GetDiskFreeSpaceExA', windll.kernel32), (
|
| - (1, 'lpszPathName'),
|
| - (2, 'lpFreeUserSpace'),
|
| - (2, 'lpTotalSpace'),
|
| - (2, 'lpFreeSpace'),))
|
| -
|
| - def GetDiskFreeSpaceEx_errcheck(result, func, args):
|
| - if not result:
|
| - raise WinError()
|
| - return args[1].value
|
| - GetDiskFreeSpaceEx.errcheck = GetDiskFreeSpaceEx_errcheck
|
| -
|
| - return GetDiskFreeSpaceEx(os.getenv('SystemDrive'))
|
| - else:
|
| - (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path)
|
| - return f_frsize * f_bavail
|
| -
|
| - def _PerformResumableUploadIfApplies(self, fp, src_uri, dst_uri, canned_acl,
|
| - headers, file_size, already_split=False):
|
| - """
|
| - Performs resumable upload if supported by provider and file is above
|
| - threshold, else performs non-resumable upload.
|
| -
|
| - Returns (elapsed_time, bytes_transferred, version-specific dst_uri).
|
| - """
|
| - start_time = time.time()
|
| - (cb, num_cb, res_upload_handler) = self._GetTransferHandlers(
|
| - dst_uri, file_size, True)
|
| - if dst_uri.scheme == 'gs':
|
| - # Resumable upload protocol is Google Cloud Storage-specific.
|
| - dst_uri.set_contents_from_file(fp, headers, policy=canned_acl,
|
| - cb=cb, num_cb=num_cb,
|
| - res_upload_handler=res_upload_handler)
|
| - else:
|
| - dst_uri.set_contents_from_file(fp, headers, policy=canned_acl,
|
| - cb=cb, num_cb=num_cb)
|
| - if res_upload_handler:
|
| - # ResumableUploadHandler does not update upload_start_point from its
|
| - # initial value of -1 if transferring the whole file, so clamp at 0
|
| - bytes_transferred = file_size - max(
|
| - res_upload_handler.upload_start_point, 0)
|
| - if self.use_manifest and not already_split:
|
| - # Save the upload indentifier in the manifest file, unless we're
|
| - # uploading a temporary component for parallel composite uploads.
|
| - self.manifest.Set(
|
| - src_uri, 'upload_id', res_upload_handler.get_upload_id())
|
| - else:
|
| - bytes_transferred = file_size
|
| - end_time = time.time()
|
| - return (end_time - start_time, bytes_transferred, dst_uri)
|
| -
|
| - def _PerformStreamingUpload(self, fp, dst_uri, headers, canned_acl=None):
|
| - """
|
| - Performs a streaming upload to the cloud.
|
| -
|
| - Args:
|
| - fp: The file whose contents to upload.
|
| - dst_uri: Destination StorageUri.
|
| - headers: A copy of the top-level headers dictionary.
|
| - canned_acl: Optional canned ACL to set on the object.
|
| -
|
| - Returns (elapsed_time, bytes_transferred, version-specific dst_uri).
|
| - """
|
| - start_time = time.time()
|
| -
|
| - cb = self._StreamCopyCallbackHandler(self.logger).call
|
| - dst_uri.set_contents_from_stream(
|
| - fp, headers, policy=canned_acl, cb=cb)
|
| - try:
|
| - bytes_transferred = fp.tell()
|
| - except:
|
| - bytes_transferred = 0
|
| -
|
| - end_time = time.time()
|
| - return (end_time - start_time, bytes_transferred, dst_uri)
|
| -
|
| - def _SetContentTypeHeader(self, src_uri, headers):
|
| - """
|
| - Sets content type header to value specified in '-h Content-Type' option (if
|
| - specified); else sets using Content-Type detection.
|
| - """
|
| - if 'content-type' in headers:
|
| - # If empty string specified (i.e., -h "Content-Type:") set header to None,
|
| - # which will inhibit boto from sending the CT header. Otherwise, boto will
|
| - # pass through the user specified CT header.
|
| - if not headers['content-type']:
|
| - headers['content-type'] = None
|
| - # else we'll keep the value passed in via -h option (not performing
|
| - # content type detection).
|
| - else:
|
| - # Only do content type recognition is src_uri is a file. Object-to-object
|
| - # copies with no -h Content-Type specified re-use the content type of the
|
| - # source object.
|
| - if src_uri.is_file_uri():
|
| - object_name = src_uri.object_name
|
| - content_type = None
|
| - # Streams (denoted by '-') are expected to be 'application/octet-stream'
|
| - # and 'file' would partially consume them.
|
| - if object_name != '-':
|
| - if self.USE_MAGICFILE:
|
| - p = subprocess.Popen(['file', '--mime-type', object_name],
|
| - stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
| - output, error = p.communicate()
|
| - if p.returncode != 0 or error:
|
| - raise CommandException(
|
| - 'Encountered error running "file --mime-type %s" '
|
| - '(returncode=%d).\n%s' % (object_name, p.returncode, error))
|
| - # Parse output by removing line delimiter and splitting on last ":
|
| - content_type = output.rstrip().rpartition(': ')[2]
|
| - else:
|
| - content_type = mimetypes.guess_type(object_name)[0]
|
| - if not content_type:
|
| - content_type = self.DEFAULT_CONTENT_TYPE
|
| - headers['content-type'] = content_type
|
| -
|
| - def _GetFileSize(self, fp):
|
| - """Determines file size different ways for case where fp is actually a
|
| - wrapper around a Key vs an actual file.
|
| -
|
| - Args:
|
| - The file whose size we wish to determine.
|
| -
|
| - Returns:
|
| - The size of the file, in bytes.
|
| - """
|
| - if isinstance(fp, KeyFile):
|
| - return fp.getkey().size
|
| - else:
|
| - return os.path.getsize(fp.name)
|
| -
|
| - def _UploadFileToObject(self, src_key, src_uri, dst_uri, headers,
|
| - should_log=True, allow_splitting=True):
|
| - """Uploads a local file to an object.
|
| -
|
| - Args:
|
| - src_key: Source StorageUri. Must be a file URI.
|
| - src_uri: Source StorageUri.
|
| - dst_uri: Destination StorageUri.
|
| - headers: The headers dictionary.
|
| - should_log: bool indicator whether we should log this operation.
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, version-specific dst_uri), excluding
|
| - overhead like initial HEAD.
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - gzip_exts = []
|
| - canned_acl = None
|
| - if self.sub_opts:
|
| - for o, a in self.sub_opts:
|
| - if o == '-a':
|
| - canned_acls = dst_uri.canned_acls()
|
| - if a not in canned_acls:
|
| - raise CommandException('Invalid canned ACL "%s".' % a)
|
| - canned_acl = a
|
| - elif o == '-t':
|
| - self.logger.warning(
|
| - 'Warning: -t is deprecated, and will be removed in the future. '
|
| - 'Content type\ndetection is '
|
| - 'now performed by default, unless inhibited by specifying '
|
| - 'a\nContent-Type header via the -h option.')
|
| - elif o == '-z':
|
| - gzip_exts = a.split(',')
|
| -
|
| - self._SetContentTypeHeader(src_uri, headers)
|
| - if should_log:
|
| - self._LogCopyOperation(src_uri, dst_uri, headers)
|
| -
|
| - if 'content-language' not in headers:
|
| - content_language = config.get_value('GSUtil', 'content_language')
|
| - if content_language:
|
| - headers['content-language'] = content_language
|
| -
|
| - fname_parts = src_uri.object_name.split('.')
|
| - if len(fname_parts) > 1 and fname_parts[-1] in gzip_exts:
|
| - self.logger.debug('Compressing %s (to tmp)...', src_key)
|
| - (gzip_fh, gzip_path) = tempfile.mkstemp()
|
| - gzip_fp = None
|
| - try:
|
| - # Check for temp space. Assume the compressed object is at most 2x
|
| - # the size of the object (normally should compress to smaller than
|
| - # the object)
|
| - if (self._CheckFreeSpace(gzip_path)
|
| - < 2*int(os.path.getsize(src_key.name))):
|
| - raise CommandException('Inadequate temp space available to compress '
|
| - '%s' % src_key.name)
|
| - gzip_fp = gzip.open(gzip_path, 'wb')
|
| - gzip_fp.writelines(src_key.fp)
|
| - finally:
|
| - if gzip_fp:
|
| - gzip_fp.close()
|
| - os.close(gzip_fh)
|
| - headers['content-encoding'] = 'gzip'
|
| - gzip_fp = open(gzip_path, 'rb')
|
| - try:
|
| - file_size = self._GetFileSize(gzip_fp)
|
| - (elapsed_time, bytes_transferred, result_uri) = (
|
| - self._PerformResumableUploadIfApplies(gzip_fp, src_uri, dst_uri,
|
| - canned_acl, headers,
|
| - file_size))
|
| - finally:
|
| - gzip_fp.close()
|
| - try:
|
| - os.unlink(gzip_path)
|
| - # Windows sometimes complains the temp file is locked when you try to
|
| - # delete it.
|
| - except Exception, e:
|
| - pass
|
| - elif (src_key.is_stream()
|
| - and dst_uri.get_provider().supports_chunked_transfer()):
|
| - (elapsed_time, bytes_transferred, result_uri) = (
|
| - self._PerformStreamingUpload(src_key.fp, dst_uri, headers,
|
| - canned_acl))
|
| - else:
|
| - if src_key.is_stream():
|
| - # For Providers that don't support chunked Transfers
|
| - tmp = tempfile.NamedTemporaryFile()
|
| - file_uri = self.suri_builder.StorageUri('file://%s' % tmp.name)
|
| - try:
|
| - file_uri.new_key(False, headers).set_contents_from_file(
|
| - src_key.fp, headers)
|
| - src_key = file_uri.get_key()
|
| - finally:
|
| - file_uri.close()
|
| - try:
|
| - fp = src_key.fp
|
| - file_size = self._GetFileSize(fp)
|
| - if self._ShouldDoParallelCompositeUpload(allow_splitting, src_key,
|
| - dst_uri, file_size):
|
| - (elapsed_time, bytes_transferred, result_uri) = (
|
| - self._DoParallelCompositeUpload(fp, src_uri, dst_uri, headers,
|
| - canned_acl, file_size))
|
| - else:
|
| - (elapsed_time, bytes_transferred, result_uri) = (
|
| - self._PerformResumableUploadIfApplies(
|
| - src_key.fp, src_uri, dst_uri, canned_acl, headers,
|
| - self._GetFileSize(src_key.fp)))
|
| - finally:
|
| - if src_key.is_stream():
|
| - tmp.close()
|
| - else:
|
| - src_key.close()
|
| - return (elapsed_time, bytes_transferred, result_uri)
|
| -
|
| - def _GetHashAlgs(self, key):
|
| - hash_algs = {}
|
| - check_hashes_config = config.get(
|
| - 'GSUtil', 'check_hashes', 'if_fast_else_fail')
|
| - if check_hashes_config == 'never':
|
| - return hash_algs
|
| - if self._GetMD5FromETag(key):
|
| - hash_algs['md5'] = md5
|
| - if hasattr(key, 'cloud_hashes') and key.cloud_hashes:
|
| - if 'md5' in key.cloud_hashes:
|
| - hash_algs['md5'] = md5
|
| - # If the cloud provider supplies a CRC, we'll compute a checksum to
|
| - # validate if we're using a native crcmod installation or MD5 isn't
|
| - # offered as an alternative.
|
| - if 'crc32c' in key.cloud_hashes:
|
| - if UsingCrcmodExtension(crcmod):
|
| - hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
|
| - elif not hash_algs:
|
| - if check_hashes_config == 'if_fast_else_fail':
|
| - raise SLOW_CRC_EXCEPTION
|
| - elif check_hashes_config == 'if_fast_else_skip':
|
| - sys.stderr.write(NO_HASH_CHECK_WARNING)
|
| - elif check_hashes_config == 'always':
|
| - sys.stderr.write(SLOW_CRC_WARNING)
|
| - hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
|
| - else:
|
| - raise CommandException(
|
| - 'Your boto config \'check_hashes\' option is misconfigured.')
|
| - elif not hash_algs:
|
| - if check_hashes_config == 'if_fast_else_skip':
|
| - sys.stderr.write(NO_SERVER_HASH_WARNING)
|
| - else:
|
| - raise NO_SERVER_HASH_EXCEPTION
|
| - return hash_algs
|
| -
|
| - def _DownloadObjectToFile(self, src_key, src_uri, dst_uri, headers,
|
| - should_log=True):
|
| - """Downloads an object to a local file.
|
| -
|
| - Args:
|
| - src_key: Source Key.
|
| - src_uri: Source StorageUri.
|
| - dst_uri: Destination StorageUri.
|
| - headers: The headers dictionary.
|
| - should_log: bool indicator whether we should log this operation.
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, dst_uri), excluding overhead like
|
| - initial HEAD.
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - if should_log:
|
| - self._LogCopyOperation(src_uri, dst_uri, headers)
|
| - (cb, num_cb, res_download_handler) = self._GetTransferHandlers(
|
| - dst_uri, src_key.size, False)
|
| - file_name = dst_uri.object_name
|
| - dir_name = os.path.dirname(file_name)
|
| - if dir_name and not os.path.exists(dir_name):
|
| - # Do dir creation in try block so can ignore case where dir already
|
| - # exists. This is needed to avoid a race condition when running gsutil
|
| - # -m cp.
|
| - try:
|
| - os.makedirs(dir_name)
|
| - except OSError, e:
|
| - if e.errno != errno.EEXIST:
|
| - raise
|
| - # For gzipped objects, download to a temp file and unzip.
|
| - if (hasattr(src_key, 'content_encoding')
|
| - and src_key.content_encoding == 'gzip'):
|
| - # We can't use tempfile.mkstemp() here because we need a predictable
|
| - # filename for resumable downloads.
|
| - download_file_name = '%s_.gztmp' % file_name
|
| - need_to_unzip = True
|
| - else:
|
| - download_file_name = file_name
|
| - need_to_unzip = False
|
| -
|
| - hash_algs = self._GetHashAlgs(src_key)
|
| -
|
| - # Add accept encoding for download operation.
|
| - AddAcceptEncoding(headers)
|
| -
|
| - fp = None
|
| - try:
|
| - if res_download_handler:
|
| - fp = open(download_file_name, 'ab')
|
| - else:
|
| - fp = open(download_file_name, 'wb')
|
| - start_time = time.time()
|
| - # Use our hash_algs if get_contents_to_file() will accept them, else the
|
| - # default (md5-only) will suffice.
|
| - try:
|
| - src_key.get_contents_to_file(fp, headers, cb=cb, num_cb=num_cb,
|
| - res_download_handler=res_download_handler,
|
| - hash_algs=hash_algs)
|
| - except TypeError:
|
| - src_key.get_contents_to_file(fp, headers, cb=cb, num_cb=num_cb,
|
| - res_download_handler=res_download_handler)
|
| -
|
| - # If a custom test method is defined, call it here. For the copy command,
|
| - # test methods are expected to take one argument: an open file pointer,
|
| - # and are used to perturb the open file during download to exercise
|
| - # download error detection.
|
| - if self.test_method:
|
| - self.test_method(fp)
|
| - end_time = time.time()
|
| - finally:
|
| - if fp:
|
| - fp.close()
|
| -
|
| - if (not need_to_unzip and
|
| - hasattr(src_key, 'content_encoding')
|
| - and src_key.content_encoding == 'gzip'):
|
| - # TODO: HEAD requests are currently not returning proper Content-Encoding
|
| - # headers when an object is gzip-encoded on-the-fly. Remove this once
|
| - # it's fixed.
|
| - renamed_file_name = '%s_.gztmp' % file_name
|
| - os.rename(download_file_name, renamed_file_name)
|
| - download_file_name = renamed_file_name
|
| - need_to_unzip = True
|
| -
|
| - # Discard all hashes if we are resuming a partial download.
|
| - if res_download_handler and res_download_handler.download_start_point:
|
| - src_key.local_hashes = {}
|
| -
|
| - # Verify downloaded file checksum matched source object's checksum.
|
| - digest_verified = True
|
| - computed_hashes = None
|
| - try:
|
| - self._CheckHashes(src_key, download_file_name, hash_algs)
|
| - except CommandException, e:
|
| - # If the digest doesn't match, we'll try checking it again after
|
| - # unzipping.
|
| - if (not need_to_unzip or
|
| - 'doesn\'t match cloud-supplied digest' not in str(e)):
|
| - os.unlink(download_file_name)
|
| - raise
|
| - digest_verified = False
|
| - computed_hashes = dict(
|
| - (alg, digester())
|
| - for alg, digester in self._GetHashAlgs(src_key).iteritems())
|
| -
|
| - if res_download_handler:
|
| - bytes_transferred = (
|
| - src_key.size - res_download_handler.download_start_point)
|
| - else:
|
| - bytes_transferred = src_key.size
|
| -
|
| - if need_to_unzip:
|
| - # Log that we're uncompressing if the file is big enough that
|
| - # decompressing would make it look like the transfer "stalled" at the end.
|
| - if bytes_transferred > 10 * 1024 * 1024:
|
| - self.logger.info('Uncompressing downloaded tmp file to %s...',
|
| - file_name)
|
| -
|
| - # Downloaded gzipped file to a filename w/o .gz extension, so unzip.
|
| - f_in = gzip.open(download_file_name, 'rb')
|
| - with open(file_name, 'wb') as f_out:
|
| - data = f_in.read(self.GUNZIP_CHUNK_SIZE)
|
| - while data:
|
| - f_out.write(data)
|
| - if computed_hashes:
|
| - # Compute digests again on the uncompressed data.
|
| - for alg in computed_hashes.itervalues():
|
| - alg.update(data)
|
| - data = f_in.read(self.GUNZIP_CHUNK_SIZE)
|
| - f_in.close()
|
| -
|
| - os.unlink(download_file_name)
|
| -
|
| - if not digest_verified:
|
| - computed_hashes = dict((alg, digester.digest())
|
| - for alg, digester in computed_hashes.iteritems())
|
| - try:
|
| - self._CheckHashes(
|
| - src_key, file_name, hash_algs, computed_hashes=computed_hashes)
|
| - except CommandException, e:
|
| - os.unlink(file_name)
|
| - raise
|
| -
|
| - return (end_time - start_time, bytes_transferred, dst_uri)
|
| -
|
| - def _PerformDownloadToStream(self, src_key, src_uri, str_fp, headers):
|
| - (cb, num_cb, res_download_handler) = self._GetTransferHandlers(
|
| - src_uri, src_key.size, False)
|
| - start_time = time.time()
|
| - src_key.get_contents_to_file(str_fp, headers, cb=cb, num_cb=num_cb)
|
| - end_time = time.time()
|
| - bytes_transferred = src_key.size
|
| - end_time = time.time()
|
| - return (end_time - start_time, bytes_transferred)
|
| -
|
| - def _CopyFileToFile(self, src_key, src_uri, dst_uri, headers):
|
| - """Copies a local file to a local file.
|
| -
|
| - Args:
|
| - src_key: Source StorageUri. Must be a file URI.
|
| - src_uri: Source StorageUri.
|
| - dst_uri: Destination StorageUri.
|
| - headers: The headers dictionary.
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, dst_uri), excluding
|
| - overhead like initial HEAD.
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - self._LogCopyOperation(src_uri, dst_uri, headers)
|
| - dst_key = dst_uri.new_key(False, headers)
|
| - start_time = time.time()
|
| - dst_key.set_contents_from_file(src_key.fp, headers)
|
| - end_time = time.time()
|
| - return (end_time - start_time, os.path.getsize(src_key.fp.name), dst_uri)
|
| -
|
| - def _CopyObjToObjDaisyChainMode(self, src_key, src_uri, dst_uri, headers):
|
| - """Copies from src_uri to dst_uri in "daisy chain" mode.
|
| - See -D OPTION documentation about what daisy chain mode is.
|
| -
|
| - Args:
|
| - src_key: Source Key.
|
| - src_uri: Source StorageUri.
|
| - dst_uri: Destination StorageUri.
|
| - headers: A copy of the top-level headers dictionary.
|
| -
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, version-specific dst_uri) excluding
|
| - overhead like initial HEAD.
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - # Start with copy of input headers, so we'll include any headers that need
|
| - # to be set from higher up in call stack (like x-goog-if-generation-match).
|
| - headers = headers.copy()
|
| - # Now merge headers from src_key so we'll preserve metadata.
|
| - # Unfortunately boto separates headers into ones it puts in the metadata
|
| - # dict and ones it pulls out into specific key fields, so we need to walk
|
| - # through the latter list to find the headers that we copy over to the dest
|
| - # object.
|
| - for header_name, field_name in (
|
| - ('cache-control', 'cache_control'),
|
| - ('content-type', 'content_type'),
|
| - ('content-language', 'content_language'),
|
| - ('content-encoding', 'content_encoding'),
|
| - ('content-disposition', 'content_disposition')):
|
| - value = getattr(src_key, field_name, None)
|
| - if value:
|
| - headers[header_name] = value
|
| - # Boto represents x-goog-meta-* headers in metadata dict with the
|
| - # x-goog-meta- or x-amx-meta- prefix stripped. Turn these back into headers
|
| - # for the destination object.
|
| - for name, value in src_key.metadata.items():
|
| - header_name = '%smeta-%s' % (dst_uri.get_provider().header_prefix, name)
|
| - headers[header_name] = value
|
| - # Set content type if specified in '-h Content-Type' option.
|
| - self._SetContentTypeHeader(src_uri, headers)
|
| - self._LogCopyOperation(src_uri, dst_uri, headers)
|
| - (preserve_acl, canned_acl, headers) = (
|
| - self._ProcessCopyObjectToObjectOptions(dst_uri, headers))
|
| - if preserve_acl:
|
| - if src_uri.get_provider() != dst_uri.get_provider():
|
| - # We don't attempt to preserve ACLs across providers because
|
| - # GCS and S3 support different ACLs and disjoint principals.
|
| - raise NotImplementedError('Cross-provider cp -p not supported')
|
| - # We need to read and write the ACL manually because the
|
| - # Key.set_contents_from_file() API doesn't provide a preserve_acl
|
| - # parameter (unlike the Bucket.copy_key() API used
|
| - # by_CopyObjToObjInTheCloud).
|
| - acl = src_uri.get_acl(headers=headers)
|
| - fp = KeyFile(src_key)
|
| - result = self._PerformResumableUploadIfApplies(fp, src_uri,
|
| - dst_uri, canned_acl, headers,
|
| - self._GetFileSize(fp))
|
| - if preserve_acl:
|
| - # If user specified noclobber flag, we need to remove the
|
| - # x-goog-if-generation-match:0 header that was set when uploading the
|
| - # object, because that precondition would fail when updating the ACL on
|
| - # the now-existing object.
|
| - if self.no_clobber:
|
| - del headers['x-goog-if-generation-match']
|
| - # Remove the owner field from the ACL in case we're copying from an object
|
| - # that is owned by a different user. If we left that other user in the
|
| - # ACL, attempting to set the ACL would result in a 400 (Bad Request).
|
| - if hasattr(acl, 'owner'):
|
| - del acl.owner
|
| - dst_uri.set_acl(acl, dst_uri.object_name, headers=headers)
|
| - return result
|
| -
|
| - def _PerformCopy(self, src_uri, dst_uri, allow_splitting=True):
|
| - """Performs copy from src_uri to dst_uri, handling various special cases.
|
| -
|
| - Args:
|
| - src_uri: Source StorageUri.
|
| - dst_uri: Destination StorageUri.
|
| - allow_splitting: Whether to allow the file to be split into component
|
| - pieces for an parallel composite upload.
|
| -
|
| - Returns:
|
| - (elapsed_time, bytes_transferred, version-specific dst_uri) excluding
|
| - overhead like initial HEAD.
|
| -
|
| - Raises:
|
| - CommandException: if errors encountered.
|
| - """
|
| - # Make a copy of the input headers each time so we can set a different
|
| - # content type for each object.
|
| - headers = self.headers.copy() if self.headers else {}
|
| - download_headers = headers.copy()
|
| - # Add accept encoding for download operation.
|
| - AddAcceptEncoding(download_headers)
|
| -
|
| - src_key = src_uri.get_key(False, download_headers)
|
| - if not src_key:
|
| - raise CommandException('"%s" does not exist.' % src_uri)
|
| -
|
| - if self.use_manifest:
|
| - # Set the source size in the manifest.
|
| - self.manifest.Set(src_uri, 'size', getattr(src_key, 'size', None))
|
| -
|
| - # On Windows, stdin is opened as text mode instead of binary which causes
|
| - # problems when piping a binary file, so this switches it to binary mode.
|
| - if IS_WINDOWS and src_uri.is_file_uri() and src_key.is_stream():
|
| - import msvcrt
|
| - msvcrt.setmode(src_key.fp.fileno(), os.O_BINARY)
|
| -
|
| - if self.no_clobber:
|
| - # There are two checks to prevent clobbering:
|
| - # 1) The first check is to see if the item
|
| - # already exists at the destination and prevent the upload/download
|
| - # from happening. This is done by the exists() call.
|
| - # 2) The second check is only relevant if we are writing to gs. We can
|
| - # enforce that the server only writes the object if it doesn't exist
|
| - # by specifying the header below. This check only happens at the
|
| - # server after the complete file has been uploaded. We specify this
|
| - # header to prevent a race condition where a destination file may
|
| - # be created after the first check and before the file is fully
|
| - # uploaded.
|
| - # In order to save on unnecessary uploads/downloads we perform both
|
| - # checks. However, this may come at the cost of additional HTTP calls.
|
| - if dst_uri.exists(download_headers):
|
| - if dst_uri.is_file_uri():
|
| - # The local file may be a partial. Check the file sizes.
|
| - if src_key.size == dst_uri.get_key(headers=download_headers).size:
|
| - raise ItemExistsError()
|
| - else:
|
| - raise ItemExistsError()
|
| - if dst_uri.is_cloud_uri() and dst_uri.scheme == 'gs':
|
| - headers['x-goog-if-generation-match'] = '0'
|
| -
|
| - if src_uri.is_cloud_uri() and dst_uri.is_cloud_uri():
|
| - if src_uri.scheme == dst_uri.scheme and not self.daisy_chain:
|
| - return self._CopyObjToObjInTheCloud(src_key, src_uri, dst_uri, headers)
|
| - else:
|
| - return self._CopyObjToObjDaisyChainMode(src_key, src_uri, dst_uri,
|
| - headers)
|
| - elif src_uri.is_file_uri() and dst_uri.is_cloud_uri():
|
| - return self._UploadFileToObject(src_key, src_uri, dst_uri,
|
| - download_headers)
|
| - elif src_uri.is_cloud_uri() and dst_uri.is_file_uri():
|
| - return self._DownloadObjectToFile(src_key, src_uri, dst_uri,
|
| - download_headers)
|
| - elif src_uri.is_file_uri() and dst_uri.is_file_uri():
|
| - return self._CopyFileToFile(src_key, src_uri, dst_uri, download_headers)
|
| - else:
|
| - raise CommandException('Unexpected src/dest case')
|
| -
|
| - def _PartitionFile(self, fp, file_size, src_uri, headers, canned_acl, bucket,
|
| - random_prefix, tracker_file, tracker_file_lock):
|
| - """Partitions a file into FilePart objects to be uploaded and later composed
|
| - into an object matching the original file. This entails splitting the
|
| - file into parts, naming and forming a destination URI for each part,
|
| - and also providing the PerformResumableUploadIfAppliesArgs object
|
| - corresponding to each part.
|
| -
|
| - Args:
|
| - fp: The file object to be partitioned.
|
| - file_size: The size of fp, in bytes.
|
| - src_uri: The source StorageUri fromed from the original command.
|
| - headers: The headers which ultimately passed to boto.
|
| - canned_acl: The user-provided canned_acl, if applicable.
|
| - bucket: The name of the destination bucket, of the form gs://bucket
|
| - random_prefix: The randomly-generated prefix used to prevent collisions
|
| - among the temporary component names.
|
| - tracker_file: The path to the parallel composite upload tracker file.
|
| - tracker_file_lock: The lock protecting access to the tracker file.
|
| -
|
| - Returns:
|
| - dst_args: The destination URIs for the temporary component objects.
|
| - """
|
| - parallel_composite_upload_component_size = HumanReadableToBytes(
|
| - boto.config.get('GSUtil', 'parallel_composite_upload_component_size',
|
| - DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE))
|
| - (num_components, component_size) = _GetPartitionInfo(file_size,
|
| - MAX_COMPOSE_ARITY, parallel_composite_upload_component_size)
|
| -
|
| - # Make sure that the temporary objects don't already exist.
|
| - tmp_object_headers = copy.deepcopy(headers)
|
| - tmp_object_headers['x-goog-if-generation-match'] = '0'
|
| -
|
| - uri_strs = [] # Used to create a NameExpansionIterator.
|
| - dst_args = {} # Arguments to create commands and pass to subprocesses.
|
| - file_names = [] # Used for the 2-step process of forming dst_args.
|
| - for i in range(num_components):
|
| - # "Salt" the object name with something a user is very unlikely to have
|
| - # used in an object name, then hash the extended name to make sure
|
| - # we don't run into problems with name length. Using a deterministic
|
| - # naming scheme for the temporary components allows users to take
|
| - # advantage of resumable uploads for each component.
|
| - encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode('utf-8')
|
| - content_md5 = md5()
|
| - content_md5.update(encoded_name)
|
| - digest = content_md5.hexdigest()
|
| - temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE +
|
| - digest + '_' + str(i))
|
| - tmp_dst_uri = MakeGsUri(bucket, temp_file_name, self.suri_builder)
|
| -
|
| - if i < (num_components - 1):
|
| - # Every component except possibly the last is the same size.
|
| - file_part_length = component_size
|
| - else:
|
| - # The last component just gets all of the remaining bytes.
|
| - file_part_length = (file_size - ((num_components -1) * component_size))
|
| - offset = i * component_size
|
| - func_args = PerformResumableUploadIfAppliesArgs(
|
| - fp.name, offset, file_part_length, src_uri, tmp_dst_uri, canned_acl,
|
| - headers, tracker_file, tracker_file_lock)
|
| - file_names.append(temp_file_name)
|
| - dst_args[temp_file_name] = func_args
|
| - uri_strs.append(self._MakeGsUriStr(bucket, temp_file_name))
|
| -
|
| - return dst_args
|
| -
|
| - def _MakeFileUri(self, filename):
|
| - """Returns a StorageUri for a local file."""
|
| - return self.suri_builder.StorageUri(filename)
|
| -
|
| - def _MakeGsUriStr(self, bucket, filename):
|
| - """Returns a string of the form gs://bucket/filename, used to indicate an
|
| - object in Google Cloud Storage.
|
| - """
|
| - return 'gs://' + bucket + '/' + filename
|
| -
|
| - def _DoParallelCompositeUpload(self, fp, src_uri, dst_uri, headers,
|
| - canned_acl, file_size):
|
| - """Uploads a local file to an object in the cloud for the Parallel Composite
|
| - Uploads feature. The file is partitioned into parts, and then the parts
|
| - are uploaded in parallel, composed to form the original destination
|
| - object, and deleted.
|
| -
|
| - Args:
|
| - fp: The file object to be uploaded.
|
| - src_uri: The StorageURI of the local file.
|
| - dst_uri: The StorageURI of the destination file.
|
| - headers: The headers to pass to boto, if any.
|
| - canned_acl: The canned acl to apply to the object, if any.
|
| - file_size: The size of the source file in bytes.
|
| - """
|
| - start_time = time.time()
|
| - gs_prefix = 'gs://'
|
| - bucket = gs_prefix + dst_uri.bucket_name
|
| - if 'content-type' in headers and not headers['content-type']:
|
| - del headers['content-type']
|
| -
|
| - # Determine which components, if any, have already been successfully
|
| - # uploaded.
|
| - tracker_file = self._GetTrackerFilePath(dst_uri,
|
| - TrackerFileType.PARALLEL_UPLOAD,
|
| - src_uri)
|
| - tracker_file_lock = CreateLock()
|
| - (random_prefix, existing_components) = (
|
| - _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock))
|
| -
|
| - # Create the initial tracker file for the upload.
|
| - _CreateParallelUploadTrackerFile(tracker_file, random_prefix,
|
| - existing_components, tracker_file_lock)
|
| -
|
| - # Get the set of all components that should be uploaded.
|
| - dst_args = self._PartitionFile(fp, file_size, src_uri, headers, canned_acl,
|
| - bucket, random_prefix, tracker_file,
|
| - tracker_file_lock)
|
| -
|
| - (components_to_upload, existing_components, existing_objects_to_delete) = (
|
| - FilterExistingComponents(dst_args, existing_components, bucket,
|
| - self.suri_builder))
|
| -
|
| - # In parallel, copy all of the file parts that haven't already been
|
| - # uploaded to temporary objects.
|
| - cp_results = self.Apply(_PerformResumableUploadIfAppliesWrapper,
|
| - components_to_upload,
|
| - _CopyExceptionHandler,
|
| - ('copy_failure_count', 'total_bytes_transferred'),
|
| - arg_checker=gslib.command.DummyArgChecker,
|
| - parallel_operations_override=True,
|
| - should_return_results=True)
|
| - uploaded_components = []
|
| - total_bytes_uploaded = 0
|
| - for cp_result in cp_results:
|
| - total_bytes_uploaded += cp_result[1]
|
| - uploaded_components.append(cp_result[2])
|
| - components = uploaded_components + existing_components
|
| -
|
| - if len(components) == len(dst_args):
|
| - # Only try to compose if all of the components were uploaded successfully.
|
| -
|
| - # Sort the components so that they will be composed in the correct order.
|
| - components = sorted(
|
| - components, key=lambda component:
|
| - int(component.object_name[component.object_name.rfind('_')+1:]))
|
| - result_uri = dst_uri.compose(components, headers=headers)
|
| -
|
| - try:
|
| - # Make sure only to delete things that we know were successfully
|
| - # uploaded (as opposed to all of the objects that we attempted to
|
| - # create) so that we don't delete any preexisting objects, except for
|
| - # those that were uploaded by a previous, failed run and have since
|
| - # changed (but still have an old generation lying around).
|
| - objects_to_delete = components + existing_objects_to_delete
|
| - self.Apply(_DeleteKeyFn, objects_to_delete, _RmExceptionHandler,
|
| - arg_checker=gslib.command.DummyArgChecker,
|
| - parallel_operations_override=True)
|
| - except Exception, e:
|
| - if (e.message and ('unexpected failure in' in e.message)
|
| - and ('sub-processes, aborting' in e.message)):
|
| - # If some of the delete calls fail, don't cause the whole command to
|
| - # fail. The copy was successful iff the compose call succeeded, so
|
| - # just raise whatever exception (if any) happened before this instead,
|
| - # and reduce this to a warning.
|
| - logging.warning(
|
| - 'Failed to delete some of the following temporary objects:\n' +
|
| - '\n'.join(dst_args.keys()))
|
| - else:
|
| - raise e
|
| - finally:
|
| - with tracker_file_lock:
|
| - if os.path.exists(tracker_file):
|
| - os.unlink(tracker_file)
|
| - else:
|
| - # Some of the components failed to upload. In this case, we want to exit
|
| - # without deleting the objects.
|
| - raise CommandException(
|
| - 'Some temporary components were not uploaded successfully. '
|
| - 'Please retry this upload.')
|
| -
|
| - return (time.time() - start_time, total_bytes_uploaded, result_uri)
|
| -
|
| - def _ShouldDoParallelCompositeUpload(self, allow_splitting, src_key, dst_uri,
|
| - file_size):
|
| - """Returns True iff a parallel upload should be performed on the source key.
|
| -
|
| - Args:
|
| - allow_splitting: If false, then this function returns false.
|
| - src_key: Corresponding to a local file.
|
| - dst_uri: Corresponding to an object in the cloud.
|
| - file_size: The size of the source file, in bytes.
|
| - """
|
| - parallel_composite_upload_threshold = HumanReadableToBytes(boto.config.get(
|
| - 'GSUtil', 'parallel_composite_upload_threshold',
|
| - DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD))
|
| - return (allow_splitting # Don't split the pieces multiple times.
|
| - and not src_key.is_stream() # We can't partition streams.
|
| - and dst_uri.scheme == 'gs' # Compose is only for gs.
|
| - and parallel_composite_upload_threshold > 0
|
| - and file_size >= parallel_composite_upload_threshold
|
| - and file_size >= MIN_PARALLEL_COMPOSITE_FILE_SIZE)
|
| -
|
| - def _ExpandDstUri(self, dst_uri_str):
|
| - """
|
| - Expands wildcard if present in dst_uri_str.
|
| -
|
| - Args:
|
| - dst_uri_str: String representation of requested dst_uri.
|
| -
|
| - Returns:
|
| - (exp_dst_uri, have_existing_dst_container)
|
| - where have_existing_dst_container is a bool indicating whether
|
| - exp_dst_uri names an existing directory, bucket, or bucket subdirectory.
|
| -
|
| - Raises:
|
| - CommandException: if dst_uri_str matched more than 1 URI.
|
| - """
|
| - dst_uri = self.suri_builder.StorageUri(dst_uri_str)
|
| -
|
| - # Handle wildcarded dst_uri case.
|
| - if ContainsWildcard(dst_uri):
|
| - blr_expansion = list(self.WildcardIterator(dst_uri))
|
| - if len(blr_expansion) != 1:
|
| - raise CommandException('Destination (%s) must match exactly 1 URI' %
|
| - dst_uri_str)
|
| - blr = blr_expansion[0]
|
| - uri = blr.GetUri()
|
| - if uri.is_cloud_uri():
|
| - return (uri, uri.names_bucket() or blr.HasPrefix()
|
| - or blr.GetKey().name.endswith('/'))
|
| - else:
|
| - return (uri, uri.names_directory())
|
| -
|
| - # Handle non-wildcarded dst_uri:
|
| - if dst_uri.is_file_uri():
|
| - return (dst_uri, dst_uri.names_directory())
|
| - if dst_uri.names_bucket():
|
| - return (dst_uri, True)
|
| - # For object URIs check 3 cases: (a) if the name ends with '/' treat as a
|
| - # subdir; else, perform a wildcard expansion with dst_uri + "*" and then
|
| - # find if (b) there's a Prefix matching dst_uri, or (c) name is of form
|
| - # dir_$folder$ (and in both these cases also treat dir as a subdir).
|
| - if dst_uri.is_cloud_uri() and dst_uri_str.endswith('/'):
|
| - return (dst_uri, True)
|
| - blr_expansion = list(self.WildcardIterator(
|
| - '%s*' % dst_uri_str.rstrip(dst_uri.delim)))
|
| - for blr in blr_expansion:
|
| - if blr.GetRStrippedUriString().endswith('_$folder$'):
|
| - return (dst_uri, True)
|
| - if blr.GetRStrippedUriString() == dst_uri_str.rstrip(dst_uri.delim):
|
| - return (dst_uri, blr.HasPrefix())
|
| - return (dst_uri, False)
|
| -
|
| - def _ConstructDstUri(self, src_uri, exp_src_uri,
|
| - src_uri_names_container, src_uri_expands_to_multi,
|
| - have_multiple_srcs, exp_dst_uri,
|
| - have_existing_dest_subdir):
|
| - """
|
| - Constructs the destination URI for a given exp_src_uri/exp_dst_uri pair,
|
| - using context-dependent naming rules that mimic Linux cp and mv behavior.
|
| -
|
| - Args:
|
| - src_uri: src_uri to be copied.
|
| - exp_src_uri: Single StorageUri from wildcard expansion of src_uri.
|
| - src_uri_names_container: True if src_uri names a container (including the
|
| - case of a wildcard-named bucket subdir (like gs://bucket/abc,
|
| - where gs://bucket/abc/* matched some objects). Note that this is
|
| - additional semantics tha src_uri.names_container() doesn't understand
|
| - because the latter only understands StorageUris, not wildcards.
|
| - src_uri_expands_to_multi: True if src_uri expanded to multiple URIs.
|
| - have_multiple_srcs: True if this is a multi-source request. This can be
|
| - true if src_uri wildcard-expanded to multiple URIs or if there were
|
| - multiple source URIs in the request.
|
| - exp_dst_uri: the expanded StorageUri requested for the cp destination.
|
| - Final written path is constructed from this plus a context-dependent
|
| - variant of src_uri.
|
| - have_existing_dest_subdir: bool indicator whether dest is an existing
|
| - subdirectory.
|
| -
|
| - Returns:
|
| - StorageUri to use for copy.
|
| -
|
| - Raises:
|
| - CommandException if destination object name not specified for
|
| - source and source is a stream.
|
| - """
|
| - if self._ShouldTreatDstUriAsSingleton(
|
| - have_multiple_srcs, have_existing_dest_subdir, exp_dst_uri):
|
| - # We're copying one file or object to one file or object.
|
| - return exp_dst_uri
|
| -
|
| - if exp_src_uri.is_stream():
|
| - if exp_dst_uri.names_container():
|
| - raise CommandException('Destination object name needed when '
|
| - 'source is a stream')
|
| - return exp_dst_uri
|
| -
|
| - if not self.recursion_requested and not have_multiple_srcs:
|
| - # We're copying one file or object to a subdirectory. Append final comp
|
| - # of exp_src_uri to exp_dst_uri.
|
| - src_final_comp = exp_src_uri.object_name.rpartition(src_uri.delim)[-1]
|
| - return self.suri_builder.StorageUri('%s%s%s' % (
|
| - exp_dst_uri.uri.rstrip(exp_dst_uri.delim), exp_dst_uri.delim,
|
| - src_final_comp))
|
| -
|
| - # Else we're copying multiple sources to a directory, bucket, or a bucket
|
| - # "sub-directory".
|
| -
|
| - # Ensure exp_dst_uri ends in delim char if we're doing a multi-src copy or
|
| - # a copy to a directory. (The check for copying to a directory needs
|
| - # special-case handling so that the command:
|
| - # gsutil cp gs://bucket/obj dir
|
| - # will turn into file://dir/ instead of file://dir -- the latter would cause
|
| - # the file "dirobj" to be created.)
|
| - # Note: need to check have_multiple_srcs or src_uri.names_container()
|
| - # because src_uri could be a bucket containing a single object, named
|
| - # as gs://bucket.
|
| - if ((have_multiple_srcs or src_uri.names_container()
|
| - or os.path.isdir(exp_dst_uri.object_name))
|
| - and not exp_dst_uri.uri.endswith(exp_dst_uri.delim)):
|
| - exp_dst_uri = exp_dst_uri.clone_replace_name(
|
| - '%s%s' % (exp_dst_uri.object_name, exp_dst_uri.delim)
|
| - )
|
| -
|
| - # Making naming behavior match how things work with local Linux cp and mv
|
| - # operations depends on many factors, including whether the destination is a
|
| - # container, the plurality of the source(s), and whether the mv command is
|
| - # being used:
|
| - # 1. For the "mv" command that specifies a non-existent destination subdir,
|
| - # renaming should occur at the level of the src subdir, vs appending that
|
| - # subdir beneath the dst subdir like is done for copying. For example:
|
| - # gsutil rm -R gs://bucket
|
| - # gsutil cp -R dir1 gs://bucket
|
| - # gsutil cp -R dir2 gs://bucket/subdir1
|
| - # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2
|
| - # would (if using cp naming behavior) end up with paths like:
|
| - # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops
|
| - # whereas mv naming behavior should result in:
|
| - # gs://bucket/subdir2/dir2/.svn/all-wcprops
|
| - # 2. Copying from directories, buckets, or bucket subdirs should result in
|
| - # objects/files mirroring the source directory hierarchy. For example:
|
| - # gsutil cp dir1/dir2 gs://bucket
|
| - # should create the object gs://bucket/dir2/file2, assuming dir1/dir2
|
| - # contains file2).
|
| - # To be consistent with Linux cp behavior, there's one more wrinkle when
|
| - # working with subdirs: The resulting object names depend on whether the
|
| - # destination subdirectory exists. For example, if gs://bucket/subdir
|
| - # exists, the command:
|
| - # gsutil cp -R dir1/dir2 gs://bucket/subdir
|
| - # should create objects named like gs://bucket/subdir/dir2/a/b/c. In
|
| - # contrast, if gs://bucket/subdir does not exist, this same command
|
| - # should create objects named like gs://bucket/subdir/a/b/c.
|
| - # 3. Copying individual files or objects to dirs, buckets or bucket subdirs
|
| - # should result in objects/files named by the final source file name
|
| - # component. Example:
|
| - # gsutil cp dir1/*.txt gs://bucket
|
| - # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt,
|
| - # assuming dir1 contains f1.txt and f2.txt.
|
| -
|
| - if (self.perform_mv and self.recursion_requested
|
| - and src_uri_expands_to_multi and not have_existing_dest_subdir):
|
| - # Case 1. Handle naming rules for bucket subdir mv. Here we want to
|
| - # line up the src_uri against its expansion, to find the base to build
|
| - # the new name. For example, running the command:
|
| - # gsutil mv gs://bucket/abcd gs://bucket/xyz
|
| - # when processing exp_src_uri=gs://bucket/abcd/123
|
| - # exp_src_uri_tail should become /123
|
| - # Note: mv.py code disallows wildcard specification of source URI.
|
| - exp_src_uri_tail = exp_src_uri.uri[len(src_uri.uri):]
|
| - dst_key_name = '%s/%s' % (exp_dst_uri.object_name.rstrip('/'),
|
| - exp_src_uri_tail.strip('/'))
|
| - return exp_dst_uri.clone_replace_name(dst_key_name)
|
| -
|
| - if src_uri_names_container and not exp_dst_uri.names_file():
|
| - # Case 2. Build dst_key_name from subpath of exp_src_uri past
|
| - # where src_uri ends. For example, for src_uri=gs://bucket/ and
|
| - # exp_src_uri=gs://bucket/src_subdir/obj, dst_key_name should be
|
| - # src_subdir/obj.
|
| - src_uri_path_sans_final_dir = _GetPathBeforeFinalDir(src_uri)
|
| - if exp_src_uri.is_cloud_uri():
|
| - dst_key_name = exp_src_uri.versionless_uri[
|
| - len(src_uri_path_sans_final_dir):].lstrip(src_uri.delim)
|
| - else:
|
| - dst_key_name = exp_src_uri.uri[
|
| - len(src_uri_path_sans_final_dir):].lstrip(src_uri.delim)
|
| - # Handle case where dst_uri is a non-existent subdir.
|
| - if not have_existing_dest_subdir:
|
| - dst_key_name = dst_key_name.partition(src_uri.delim)[-1]
|
| - # Handle special case where src_uri was a directory named with '.' or
|
| - # './', so that running a command like:
|
| - # gsutil cp -r . gs://dest
|
| - # will produce obj names of the form gs://dest/abc instead of
|
| - # gs://dest/./abc.
|
| - if dst_key_name.startswith('.%s' % os.sep):
|
| - dst_key_name = dst_key_name[2:]
|
| -
|
| - else:
|
| - # Case 3.
|
| - dst_key_name = exp_src_uri.object_name.rpartition(src_uri.delim)[-1]
|
| -
|
| - if (exp_dst_uri.is_file_uri()
|
| - or self._ShouldTreatDstUriAsBucketSubDir(
|
| - have_multiple_srcs, exp_dst_uri, have_existing_dest_subdir)):
|
| - if exp_dst_uri.object_name.endswith(exp_dst_uri.delim):
|
| - dst_key_name = '%s%s%s' % (
|
| - exp_dst_uri.object_name.rstrip(exp_dst_uri.delim),
|
| - exp_dst_uri.delim, dst_key_name)
|
| - else:
|
| - delim = exp_dst_uri.delim if exp_dst_uri.object_name else ''
|
| - dst_key_name = '%s%s%s' % (exp_dst_uri.object_name, delim, dst_key_name)
|
| -
|
| - return exp_dst_uri.clone_replace_name(dst_key_name)
|
| -
|
| - def _FixWindowsNaming(self, src_uri, dst_uri):
|
| - """
|
| - Rewrites the destination URI built by _ConstructDstUri() to translate
|
| - Windows pathnames to cloud pathnames if needed.
|
| -
|
| - Args:
|
| - src_uri: Source URI to be copied.
|
| - dst_uri: The destination URI built by _ConstructDstUri().
|
| -
|
| - Returns:
|
| - StorageUri to use for copy.
|
| - """
|
| - if (src_uri.is_file_uri() and src_uri.delim == '\\'
|
| - and dst_uri.is_cloud_uri()):
|
| - trans_uri_str = re.sub(r'\\', '/', dst_uri.uri)
|
| - dst_uri = self.suri_builder.StorageUri(trans_uri_str)
|
| - return dst_uri
|
| -
|
| - def _CopyFunc(self, name_expansion_result):
|
| + # pylint: disable=too-many-statements
|
| + def CopyFunc(self, name_expansion_result, thread_state=None):
|
| """Worker function for performing the actual copy (and rm, for mv)."""
|
| - (exp_dst_uri, have_existing_dst_container) = self._ExpandDstUri(
|
| - self.args[-1])
|
| - if self.perform_mv:
|
| + gsutil_api = GetCloudApiInstance(self, thread_state=thread_state)
|
| +
|
| + copy_helper_opts = copy_helper.GetCopyHelperOpts()
|
| + if copy_helper_opts.perform_mv:
|
| cmd_name = 'mv'
|
| else:
|
| cmd_name = self.command_name
|
| - src_uri = self.suri_builder.StorageUri(
|
| - name_expansion_result.GetSrcUriStr())
|
| - exp_src_uri = self.suri_builder.StorageUri(
|
| - name_expansion_result.GetExpandedUriStr())
|
| - src_uri_names_container = name_expansion_result.NamesContainer()
|
| - src_uri_expands_to_multi = name_expansion_result.NamesContainer()
|
| - have_multiple_srcs = name_expansion_result.IsMultiSrcRequest()
|
| - have_existing_dest_subdir = (
|
| - name_expansion_result.HaveExistingDstContainer())
|
| - if src_uri.names_provider():
|
| + src_url = name_expansion_result.source_storage_url
|
| + exp_src_url = name_expansion_result.expanded_storage_url
|
| + src_url_names_container = name_expansion_result.names_container
|
| + have_multiple_srcs = name_expansion_result.is_multi_source_request
|
| +
|
| + if src_url.IsCloudUrl() and src_url.IsProvider():
|
| raise CommandException(
|
| - 'The %s command does not allow provider-only source URIs (%s)' %
|
| - (cmd_name, src_uri))
|
| + 'The %s command does not allow provider-only source URLs (%s)' %
|
| + (cmd_name, src_url))
|
| if have_multiple_srcs:
|
| - self._InsistDstUriNamesContainer(exp_dst_uri,
|
| - have_existing_dst_container,
|
| - cmd_name)
|
| + copy_helper.InsistDstUrlNamesContainer(
|
| + self.exp_dst_url, self.have_existing_dst_container, cmd_name)
|
|
|
| + # Various GUI tools (like the GCS web console) create placeholder objects
|
| + # ending with '/' when the user creates an empty directory. Normally these
|
| + # tools should delete those placeholders once objects have been written
|
| + # "under" the directory, but sometimes the placeholders are left around. We
|
| + # need to filter them out here, otherwise if the user tries to rsync from
|
| + # GCS to a local directory it will result in a directory/file conflict
|
| + # (e.g., trying to download an object called "mydata/" where the local
|
| + # directory "mydata" exists).
|
| + if IsCloudSubdirPlaceholder(exp_src_url):
|
| + self.logger.info('Skipping cloud sub-directory placeholder object %s',
|
| + exp_src_url)
|
| + return
|
|
|
| - if self.use_manifest and self.manifest.WasSuccessful(str(exp_src_uri)):
|
| + if copy_helper_opts.use_manifest and self.manifest.WasSuccessful(
|
| + exp_src_url.url_string):
|
| return
|
|
|
| - if self.perform_mv:
|
| - if name_expansion_result.NamesContainer():
|
| + if copy_helper_opts.perform_mv:
|
| + if name_expansion_result.names_container:
|
| # Use recursion_requested when performing name expansion for the
|
| - # directory mv case so we can determine if any of the source URIs are
|
| + # directory mv case so we can determine if any of the source URLs are
|
| # directories (and then use cp -R and rm -R to perform the move, to
|
| # match the behavior of Linux mv (which when moving a directory moves
|
| # all the contained files).
|
| self.recursion_requested = True
|
| - # Disallow wildcard src URIs when moving directories, as supporting it
|
| + # Disallow wildcard src URLs when moving directories, as supporting it
|
| # would make the name transformation too complex and would also be
|
| # dangerous (e.g., someone could accidentally move many objects to the
|
| # wrong name, or accidentally overwrite many objects).
|
| - if ContainsWildcard(src_uri):
|
| + if ContainsWildcard(src_url.url_string):
|
| raise CommandException('The mv command disallows naming source '
|
| 'directories using wildcards')
|
|
|
| - if (exp_dst_uri.is_file_uri()
|
| - and not os.path.exists(exp_dst_uri.object_name)
|
| + if (self.exp_dst_url.IsFileUrl()
|
| + and not os.path.exists(self.exp_dst_url.object_name)
|
| and have_multiple_srcs):
|
| - os.makedirs(exp_dst_uri.object_name)
|
| + os.makedirs(self.exp_dst_url.object_name)
|
|
|
| - dst_uri = self._ConstructDstUri(src_uri, exp_src_uri,
|
| - src_uri_names_container,
|
| - src_uri_expands_to_multi,
|
| - have_multiple_srcs, exp_dst_uri,
|
| - have_existing_dest_subdir)
|
| - dst_uri = self._FixWindowsNaming(src_uri, dst_uri)
|
| + dst_url = copy_helper.ConstructDstUrl(
|
| + src_url, exp_src_url, src_url_names_container, have_multiple_srcs,
|
| + self.exp_dst_url, self.have_existing_dst_container,
|
| + self.recursion_requested)
|
| + dst_url = copy_helper.FixWindowsNaming(src_url, dst_url)
|
|
|
| - self._CheckForDirFileConflict(exp_src_uri, dst_uri)
|
| - if self._SrcDstSame(exp_src_uri, dst_uri):
|
| + copy_helper.CheckForDirFileConflict(exp_src_url, dst_url)
|
| + if copy_helper.SrcDstSame(exp_src_url, dst_url):
|
| raise CommandException('%s: "%s" and "%s" are the same file - '
|
| - 'abort.' % (cmd_name, exp_src_uri, dst_uri))
|
| + 'abort.' % (cmd_name, exp_src_url, dst_url))
|
|
|
| - if dst_uri.is_cloud_uri() and dst_uri.is_version_specific:
|
| - raise CommandException('%s: a version-specific URI\n(%s)\ncannot be '
|
| + if dst_url.IsCloudUrl() and dst_url.HasGeneration():
|
| + raise CommandException('%s: a version-specific URL\n(%s)\ncannot be '
|
| 'the destination for gsutil cp - abort.'
|
| - % (cmd_name, dst_uri))
|
| + % (cmd_name, dst_url))
|
|
|
| elapsed_time = bytes_transferred = 0
|
| try:
|
| - if self.use_manifest:
|
| - self.manifest.Initialize(exp_src_uri, dst_uri)
|
| - (elapsed_time, bytes_transferred, result_uri) = (
|
| - self._PerformCopy(exp_src_uri, dst_uri))
|
| - if self.use_manifest:
|
| - if hasattr(dst_uri, 'md5'):
|
| - self.manifest.Set(exp_src_uri, 'md5', dst_uri.md5)
|
| - self.manifest.SetResult(exp_src_uri, bytes_transferred, 'OK')
|
| + if copy_helper_opts.use_manifest:
|
| + self.manifest.Initialize(
|
| + exp_src_url.url_string, dst_url.url_string)
|
| + (elapsed_time, bytes_transferred, result_url, md5) = (
|
| + copy_helper.PerformCopy(
|
| + self.logger, exp_src_url, dst_url, gsutil_api,
|
| + self, _CopyExceptionHandler, allow_splitting=True,
|
| + headers=self.headers, manifest=self.manifest,
|
| + gzip_exts=self.gzip_exts, test_method=self.test_method))
|
| + if copy_helper_opts.use_manifest:
|
| + if md5:
|
| + self.manifest.Set(exp_src_url.url_string, 'md5', md5)
|
| + self.manifest.SetResult(
|
| + exp_src_url.url_string, bytes_transferred, 'OK')
|
| + if copy_helper_opts.print_ver:
|
| + # Some cases don't return a version-specific URL (e.g., if destination
|
| + # is a file).
|
| + self.logger.info('Created: %s', result_url)
|
| except ItemExistsError:
|
| - message = 'Skipping existing item: %s' % dst_uri.uri
|
| + message = 'Skipping existing item: %s' % dst_url
|
| self.logger.info(message)
|
| - if self.use_manifest:
|
| - self.manifest.SetResult(exp_src_uri, 0, 'skip', message)
|
| + if copy_helper_opts.use_manifest:
|
| + self.manifest.SetResult(exp_src_url.url_string, 0, 'skip', message)
|
| except Exception, e:
|
| - if self._IsNoClobberServerException(e):
|
| - message = 'Rejected (noclobber): %s' % dst_uri.uri
|
| + if (copy_helper_opts.no_clobber and
|
| + copy_helper.IsNoClobberServerException(e)):
|
| + message = 'Rejected (noclobber): %s' % dst_url
|
| self.logger.info(message)
|
| - if self.use_manifest:
|
| - self.manifest.SetResult(exp_src_uri, 0, 'skip', message)
|
| + if copy_helper_opts.use_manifest:
|
| + self.manifest.SetResult(
|
| + exp_src_url.url_string, 0, 'skip', message)
|
| elif self.continue_on_error:
|
| - message = 'Error copying %s: %s' % (src_uri.uri, str(e))
|
| - self.copy_failure_count += 1
|
| + message = 'Error copying %s: %s' % (src_url, str(e))
|
| + self.op_failure_count += 1
|
| self.logger.error(message)
|
| - if self.use_manifest:
|
| - self.manifest.SetResult(exp_src_uri, 0, 'error', message)
|
| + if copy_helper_opts.use_manifest:
|
| + self.manifest.SetResult(
|
| + exp_src_url.url_string, 0, 'error',
|
| + RemoveCRLFFromString(message))
|
| else:
|
| - if self.use_manifest:
|
| - self.manifest.SetResult(exp_src_uri, 0, 'error', str(e))
|
| + if copy_helper_opts.use_manifest:
|
| + self.manifest.SetResult(
|
| + exp_src_url.url_string, 0, 'error', str(e))
|
| raise
|
|
|
| - if self.print_ver:
|
| - # Some cases don't return a version-specific URI (e.g., if destination
|
| - # is a file).
|
| - if hasattr(result_uri, 'version_specific_uri'):
|
| - self.logger.info('Created: %s' % result_uri.version_specific_uri)
|
| - else:
|
| - self.logger.info('Created: %s' % result_uri.uri)
|
| -
|
| # TODO: If we ever use -n (noclobber) with -M (move) (not possible today
|
| # since we call copy internally from move and don't specify the -n flag)
|
| # we'll need to only remove the source when we have not skipped the
|
| # destination.
|
| - if self.perform_mv:
|
| - self.logger.info('Removing %s...', exp_src_uri)
|
| - exp_src_uri.delete_key(validate=False, headers=self.headers)
|
| + if copy_helper_opts.perform_mv:
|
| + self.logger.info('Removing %s...', exp_src_url)
|
| + if exp_src_url.IsCloudUrl():
|
| + gsutil_api.DeleteObject(exp_src_url.bucket_name,
|
| + exp_src_url.object_name,
|
| + generation=exp_src_url.generation,
|
| + provider=exp_src_url.scheme)
|
| + else:
|
| + os.unlink(exp_src_url.object_name)
|
| +
|
| with self.stats_lock:
|
| self.total_elapsed_time += elapsed_time
|
| self.total_bytes_transferred += bytes_transferred
|
|
|
| # Command entry point.
|
| def RunCommand(self):
|
| - self._ParseArgs()
|
| + copy_helper_opts = self._ParseOpts()
|
|
|
| self.total_elapsed_time = self.total_bytes_transferred = 0
|
| if self.args[-1] == '-' or self.args[-1] == 'file://-':
|
| - self._HandleStreamingDownload()
|
| - return 0
|
| + return CatHelper(self).CatUrlStrings(self.args[:-1])
|
|
|
| - if self.read_args_from_stdin:
|
| + if copy_helper_opts.read_args_from_stdin:
|
| if len(self.args) != 1:
|
| - raise CommandException('Source URIs cannot be specified with -I option')
|
| - uri_strs = self._StdinIterator()
|
| + raise CommandException('Source URLs cannot be specified with -I option')
|
| + url_strs = copy_helper.StdinIterator()
|
| else:
|
| if len(self.args) < 2:
|
| raise CommandException('Wrong number of arguments for "cp" command.')
|
| - uri_strs = self.args[:-1]
|
| + url_strs = self.args[:-1]
|
|
|
| - (exp_dst_uri, have_existing_dst_container) = self._ExpandDstUri(
|
| - self.args[-1])
|
| + (self.exp_dst_url, self.have_existing_dst_container) = (
|
| + copy_helper.ExpandUrlToSingleBlr(self.args[-1], self.gsutil_api,
|
| + self.debug, self.project_id))
|
| +
|
| # If the destination bucket has versioning enabled iterate with
|
| # all_versions=True. That way we'll copy all versions if the source bucket
|
| # is versioned; and by leaving all_versions=False if the destination bucket
|
| # has versioning disabled we will avoid copying old versions all to the same
|
| # un-versioned destination object.
|
| + all_versions = False
|
| try:
|
| - all_versions = (exp_dst_uri.names_bucket()
|
| - and exp_dst_uri.get_versioning_config(self.headers))
|
| - except GSResponseError as e:
|
| - # This happens if the user doesn't have OWNER access on the bucket (needed
|
| - # to check if versioning is enabled). In this case fall back to copying
|
| - # all versions (which can be inefficient for the reason noted in the
|
| - # comment above). We don't try to warn the user because that would result
|
| - # in false positive warnings (since we can't check if versioning is
|
| - # enabled on the destination bucket).
|
| - if e.status == 403:
|
| + bucket = self._GetBucketWithVersioningConfig(self.exp_dst_url)
|
| + if bucket and bucket.versioning and bucket.versioning.enabled:
|
| all_versions = True
|
| - else:
|
| - raise
|
| + except AccessDeniedException:
|
| + # This happens (in the XML API only) if the user doesn't have OWNER access
|
| + # on the bucket (needed to check if versioning is enabled). In this case
|
| + # fall back to copying all versions (which can be inefficient for the
|
| + # reason noted in the comment above). We don't try to warn the user
|
| + # because that would result in false positive warnings (since we can't
|
| + # check if versioning is enabled on the destination bucket).
|
| + #
|
| + # For JSON, we will silently not return versioning if we don't have
|
| + # access.
|
| + all_versions = True
|
| +
|
| name_expansion_iterator = NameExpansionIterator(
|
| - self.command_name, self.proj_id_handler, self.headers, self.debug,
|
| - self.logger, self.bucket_storage_uri_class, uri_strs,
|
| - self.recursion_requested or self.perform_mv,
|
| - have_existing_dst_container=have_existing_dst_container,
|
| - all_versions=all_versions)
|
| - self.have_existing_dst_container = have_existing_dst_container
|
| + self.command_name, self.debug,
|
| + self.logger, self.gsutil_api, url_strs,
|
| + self.recursion_requested or copy_helper_opts.perform_mv,
|
| + project_id=self.project_id, all_versions=all_versions,
|
| + continue_on_error=self.continue_on_error or self.parallel_operations)
|
|
|
| # Use a lock to ensure accurate statistics in the face of
|
| # multi-threading/multi-processing.
|
| self.stats_lock = CreateLock()
|
|
|
| # Tracks if any copies failed.
|
| - self.copy_failure_count = 0
|
| + self.op_failure_count = 0
|
|
|
| # Start the clock.
|
| start_time = time.time()
|
|
|
| # Tuple of attributes to share/manage across multiple processes in
|
| # parallel (-m) mode.
|
| - shared_attrs = ('copy_failure_count', 'total_bytes_transferred')
|
| + shared_attrs = ('op_failure_count', 'total_bytes_transferred')
|
|
|
| # Perform copy requests in parallel (-m) mode, if requested, using
|
| # configured number of parallel processes and threads. Otherwise,
|
| # perform requests with sequential function calls in current process.
|
| self.Apply(_CopyFuncWrapper, name_expansion_iterator,
|
| - _CopyExceptionHandler, shared_attrs, fail_on_error=True)
|
| + _CopyExceptionHandler, shared_attrs,
|
| + fail_on_error=(not self.continue_on_error))
|
| self.logger.debug(
|
| 'total_bytes_transferred: %d', self.total_bytes_transferred)
|
|
|
| @@ -2292,621 +833,122 @@
|
|
|
| if self.debug == 3:
|
| # Note that this only counts the actual GET and PUT bytes for the copy
|
| - # - not any transfers for doing wildcard expansion, the initial HEAD
|
| - # request boto performs when doing a bucket.get_key() operation, etc.
|
| + # - not any transfers for doing wildcard expansion, the initial
|
| + # HEAD/GET request performed to get the object metadata, etc.
|
| if self.total_bytes_transferred != 0:
|
| self.logger.info(
|
| 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)',
|
| self.total_bytes_transferred, self.total_elapsed_time,
|
| MakeHumanReadable(self.total_bytes_per_second))
|
| - if self.copy_failure_count:
|
| - plural_str = ''
|
| - if self.copy_failure_count > 1:
|
| - plural_str = 's'
|
| + if self.op_failure_count:
|
| + plural_str = 's' if self.op_failure_count else ''
|
| raise CommandException('%d file%s/object%s could not be transferred.' % (
|
| - self.copy_failure_count, plural_str, plural_str))
|
| + self.op_failure_count, plural_str, plural_str))
|
|
|
| return 0
|
|
|
| - def _ParseArgs(self):
|
| - self.perform_mv = False
|
| + def _ParseOpts(self):
|
| + perform_mv = False
|
| + # exclude_symlinks is handled by Command parent class, so save in Command
|
| + # state rather than CopyHelperOpts.
|
| self.exclude_symlinks = False
|
| - self.no_clobber = False
|
| + no_clobber = False
|
| + # continue_on_error is handled by Command parent class, so save in Command
|
| + # state rather than CopyHelperOpts.
|
| self.continue_on_error = False
|
| - self.daisy_chain = False
|
| - self.read_args_from_stdin = False
|
| - self.print_ver = False
|
| - self.use_manifest = False
|
| + daisy_chain = False
|
| + read_args_from_stdin = False
|
| + print_ver = False
|
| + use_manifest = False
|
| + preserve_acl = False
|
| + canned_acl = None
|
| + # canned_acl is handled by a helper function in parent
|
| + # Command class, so save in Command state rather than CopyHelperOpts.
|
| + self.canned = None
|
|
|
| + # Files matching these extensions should be gzipped before uploading.
|
| + self.gzip_exts = []
|
| +
|
| + # Test hook for stopping transfers.
|
| + halt_at_byte = None
|
| +
|
| # self.recursion_requested initialized in command.py (so can be checked
|
| # in parent class for all commands).
|
| + self.manifest = None
|
| if self.sub_opts:
|
| for o, a in self.sub_opts:
|
| + if o == '-a':
|
| + canned_acl = a
|
| + self.canned = True
|
| if o == '-c':
|
| self.continue_on_error = True
|
| elif o == '-D':
|
| - self.daisy_chain = True
|
| + daisy_chain = True
|
| elif o == '-e':
|
| self.exclude_symlinks = True
|
| + elif o == '--haltatbyte':
|
| + halt_at_byte = long(a)
|
| elif o == '-I':
|
| - self.read_args_from_stdin = True
|
| + read_args_from_stdin = True
|
| elif o == '-L':
|
| - self.use_manifest = True
|
| - self.manifest = _Manifest(a)
|
| + use_manifest = True
|
| + self.manifest = Manifest(a)
|
| elif o == '-M':
|
| # Note that we signal to the cp command to perform a move (copy
|
| # followed by remove) and use directory-move naming rules by passing
|
| # the undocumented (for internal use) -M option when running the cp
|
| # command from mv.py.
|
| - self.perform_mv = True
|
| + perform_mv = True
|
| elif o == '-n':
|
| - self.no_clobber = True
|
| - elif o == '-q':
|
| - self.logger.warning(
|
| - 'Warning: gsutil cp -q is deprecated, and will be removed in the '
|
| - 'future.\nPlease use gsutil -q cp ... instead.')
|
| - self.logger.setLevel(level=logging.WARNING)
|
| + no_clobber = True
|
| + elif o == '-p':
|
| + preserve_acl = True
|
| elif o == '-r' or o == '-R':
|
| self.recursion_requested = True
|
| elif o == '-v':
|
| - self.print_ver = True
|
| + print_ver = True
|
| + elif o == '-z':
|
| + self.gzip_exts = [x.strip() for x in a.split(',')]
|
| + if preserve_acl and canned_acl:
|
| + raise CommandException(
|
| + 'Specifying both the -p and -a options together is invalid.')
|
| + return CreateCopyHelperOpts(
|
| + perform_mv=perform_mv,
|
| + no_clobber=no_clobber,
|
| + daisy_chain=daisy_chain,
|
| + read_args_from_stdin=read_args_from_stdin,
|
| + print_ver=print_ver,
|
| + use_manifest=use_manifest,
|
| + preserve_acl=preserve_acl,
|
| + canned_acl=canned_acl,
|
| + halt_at_byte=halt_at_byte)
|
|
|
| - def _HandleStreamingDownload(self):
|
| - # Destination is <STDOUT>. Manipulate sys.stdout so as to redirect all
|
| - # debug messages to <STDERR>.
|
| - stdout_fp = sys.stdout
|
| - sys.stdout = sys.stderr
|
| - did_some_work = False
|
| - for uri_str in self.args[0:len(self.args)-1]:
|
| - for uri in self.WildcardIterator(uri_str).IterUris():
|
| - did_some_work = True
|
| - key = uri.get_key(False, self.headers)
|
| - (elapsed_time, bytes_transferred) = self._PerformDownloadToStream(
|
| - key, uri, stdout_fp, self.headers)
|
| - self.total_elapsed_time += elapsed_time
|
| - self.total_bytes_transferred += bytes_transferred
|
| - if not did_some_work:
|
| - raise CommandException('No URIs matched')
|
| - if self.debug == 3:
|
| - if self.total_bytes_transferred != 0:
|
| - self.logger.info(
|
| - 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)',
|
| - self.total_bytes_transferred, self.total_elapsed_time,
|
| - MakeHumanReadable(float(self.total_bytes_transferred) /
|
| - float(self.total_elapsed_time)))
|
| + def _GetBucketWithVersioningConfig(self, exp_dst_url):
|
| + """Gets versioning config for a bucket and ensures that it exists.
|
|
|
| - def _StdinIterator(self):
|
| - """A generator function that returns lines from stdin."""
|
| - for line in sys.stdin:
|
| - # Strip CRLF.
|
| - yield line.rstrip()
|
| -
|
| - def _SrcDstSame(self, src_uri, dst_uri):
|
| - """Checks if src_uri and dst_uri represent the same object or file.
|
| -
|
| - We don't handle anything about hard or symbolic links.
|
| -
|
| Args:
|
| - src_uri: Source StorageUri.
|
| - dst_uri: Destination StorageUri.
|
| + exp_dst_url: Wildcard-expanded destination StorageUrl.
|
|
|
| - Returns:
|
| - Bool indicator.
|
| - """
|
| - if src_uri.is_file_uri() and dst_uri.is_file_uri():
|
| - # Translate a/b/./c to a/b/c, so src=dst comparison below works.
|
| - new_src_path = os.path.normpath(src_uri.object_name)
|
| - new_dst_path = os.path.normpath(dst_uri.object_name)
|
| - return (src_uri.clone_replace_name(new_src_path).uri ==
|
| - dst_uri.clone_replace_name(new_dst_path).uri)
|
| - else:
|
| - return (src_uri.uri == dst_uri.uri and
|
| - src_uri.generation == dst_uri.generation and
|
| - src_uri.version_id == dst_uri.version_id)
|
| + Raises:
|
| + AccessDeniedException: if there was a permissions problem accessing the
|
| + bucket or its versioning config.
|
| + CommandException: if URL refers to a cloud bucket that does not exist.
|
|
|
| - def _ShouldTreatDstUriAsBucketSubDir(self, have_multiple_srcs, dst_uri,
|
| - have_existing_dest_subdir):
|
| - """
|
| - Checks whether dst_uri should be treated as a bucket "sub-directory". The
|
| - decision about whether something constitutes a bucket "sub-directory"
|
| - depends on whether there are multiple sources in this request and whether
|
| - there is an existing bucket subdirectory. For example, when running the
|
| - command:
|
| - gsutil cp file gs://bucket/abc
|
| - if there's no existing gs://bucket/abc bucket subdirectory we should copy
|
| - file to the object gs://bucket/abc. In contrast, if
|
| - there's an existing gs://bucket/abc bucket subdirectory we should copy
|
| - file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc
|
| - exists, when running the command:
|
| - gsutil cp file1 file2 gs://bucket/abc
|
| - we should copy file1 to gs://bucket/abc/file1 (and similarly for file2).
|
| -
|
| - Note that we don't disallow naming a bucket "sub-directory" where there's
|
| - already an object at that URI. For example it's legitimate (albeit
|
| - confusing) to have an object called gs://bucket/dir and
|
| - then run the command
|
| - gsutil cp file1 file2 gs://bucket/dir
|
| - Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1,
|
| - and gs://bucket/dir/file2.
|
| -
|
| - Args:
|
| - have_multiple_srcs: Bool indicator of whether this is a multi-source
|
| - operation.
|
| - dst_uri: StorageUri to check.
|
| - have_existing_dest_subdir: bool indicator whether dest is an existing
|
| - subdirectory.
|
| -
|
| Returns:
|
| - bool indicator.
|
| + apitools Bucket with versioning configuration.
|
| """
|
| - return ((have_multiple_srcs and dst_uri.is_cloud_uri())
|
| - or (have_existing_dest_subdir))
|
| -
|
| - def _ShouldTreatDstUriAsSingleton(self, have_multiple_srcs,
|
| - have_existing_dest_subdir, dst_uri):
|
| - """
|
| - Checks that dst_uri names a singleton (file or object) after
|
| - dir/wildcard expansion. The decision is more nuanced than simply
|
| - dst_uri.names_singleton()) because of the possibility that an object path
|
| - might name a bucket sub-directory.
|
| -
|
| - Args:
|
| - have_multiple_srcs: Bool indicator of whether this is a multi-source
|
| - operation.
|
| - have_existing_dest_subdir: bool indicator whether dest is an existing
|
| - subdirectory.
|
| - dst_uri: StorageUri to check.
|
| -
|
| - Returns:
|
| - bool indicator.
|
| - """
|
| - if have_multiple_srcs:
|
| - # Only a file meets the criteria in this case.
|
| - return dst_uri.names_file()
|
| - return not have_existing_dest_subdir and dst_uri.names_singleton()
|
| -
|
| - def _IsNoClobberServerException(self, e):
|
| - """
|
| - Checks to see if the server attempted to clobber a file after we specified
|
| - in the header that we didn't want the file clobbered.
|
| -
|
| - Args:
|
| - e: The Exception that was generated by a failed copy operation
|
| -
|
| - Returns:
|
| - bool indicator - True indicates that the server did attempt to clobber
|
| - an existing file.
|
| - """
|
| - return self.no_clobber and (
|
| - (isinstance(e, GSResponseError) and e.status==412) or
|
| - (isinstance(e, ResumableUploadException) and 'code 412' in e.message))
|
| -
|
| -
|
| -class _Manifest(object):
|
| - """Stores the manifest items for the CpCommand class."""
|
| -
|
| - def __init__(self, path):
|
| - # self.items contains a dictionary of rows
|
| - self.items = {}
|
| - self.manifest_filter = {}
|
| - self.lock = multiprocessing.Manager().Lock()
|
| -
|
| - self.manifest_path = os.path.expanduser(path)
|
| - self._ParseManifest()
|
| - self._CreateManifestFile()
|
| -
|
| - def _ParseManifest(self):
|
| - """
|
| - Load and parse a manifest file. This information will be used to skip
|
| - any files that have a skip or OK status.
|
| - """
|
| - try:
|
| - if os.path.exists(self.manifest_path):
|
| - with open(self.manifest_path, 'rb') as f:
|
| - first_row = True
|
| - reader = csv.reader(f)
|
| - for row in reader:
|
| - if first_row:
|
| - try:
|
| - source_index = row.index('Source')
|
| - result_index = row.index('Result')
|
| - except ValueError:
|
| - # No header and thus not a valid manifest file.
|
| - raise CommandException(
|
| - 'Missing headers in manifest file: %s' % self.manifest_path)
|
| - first_row = False
|
| - source = row[source_index]
|
| - result = row[result_index]
|
| - if result in ['OK', 'skip']:
|
| - # We're always guaranteed to take the last result of a specific
|
| - # source uri.
|
| - self.manifest_filter[source] = result
|
| - except IOError as ex:
|
| - raise CommandException('Could not parse %s' % path)
|
| -
|
| - def WasSuccessful(self, src):
|
| - """ Returns whether the specified src uri was marked as successful."""
|
| - return src in self.manifest_filter
|
| -
|
| - def _CreateManifestFile(self):
|
| - """Opens the manifest file and assigns it to the file pointer."""
|
| - try:
|
| - if ((not os.path.exists(self.manifest_path))
|
| - or (os.stat(self.manifest_path).st_size == 0)):
|
| - # Add headers to the new file.
|
| - with open(self.manifest_path, 'wb', 1) as f:
|
| - writer = csv.writer(f)
|
| - writer.writerow(['Source',
|
| - 'Destination',
|
| - 'Start',
|
| - 'End',
|
| - 'Md5',
|
| - 'UploadId',
|
| - 'Source Size',
|
| - 'Bytes Transferred',
|
| - 'Result',
|
| - 'Description'])
|
| - except IOError:
|
| - raise CommandException('Could not create manifest file.')
|
| -
|
| - def Set(self, uri, key, value):
|
| - if value is None:
|
| - # In case we don't have any information to set we bail out here.
|
| - # This is so that we don't clobber existing information.
|
| - # To zero information pass '' instead of None.
|
| - return
|
| - if uri in self.items:
|
| - self.items[uri][key] = value
|
| - else:
|
| - self.items[uri] = {key:value}
|
| -
|
| - def Initialize(self, source_uri, destination_uri):
|
| - # Always use the source_uri as the key for the item. This is unique.
|
| - self.Set(source_uri, 'source_uri', source_uri)
|
| - self.Set(source_uri, 'destination_uri', destination_uri)
|
| - self.Set(source_uri, 'start_time', datetime.datetime.utcnow())
|
| -
|
| - def SetResult(self, source_uri, bytes_transferred, result,
|
| - description=''):
|
| - self.Set(source_uri, 'bytes', bytes_transferred)
|
| - self.Set(source_uri, 'result', result)
|
| - self.Set(source_uri, 'description', description)
|
| - self.Set(source_uri, 'end_time', datetime.datetime.utcnow())
|
| - self._WriteRowToManifestFile(source_uri)
|
| - self._RemoveItemFromManifest(source_uri)
|
| -
|
| - def _WriteRowToManifestFile(self, uri):
|
| - row_item = self.items[uri]
|
| - data = [
|
| - str(row_item['source_uri']),
|
| - str(row_item['destination_uri']),
|
| - '%sZ' % row_item['start_time'].isoformat(),
|
| - '%sZ' % row_item['end_time'].isoformat(),
|
| - row_item['md5'] if 'md5' in row_item else '',
|
| - row_item['upload_id'] if 'upload_id' in row_item else '',
|
| - str(row_item['size']) if 'size' in row_item else '',
|
| - str(row_item['bytes']) if 'bytes' in row_item else '',
|
| - row_item['result'],
|
| - row_item['description']]
|
| -
|
| - # Aquire a lock to prevent multiple threads writing to the same file at
|
| - # the same time. This would cause a garbled mess in the manifest file.
|
| - with self.lock:
|
| - with open(self.manifest_path, 'a', 1) as f: # 1 == line buffered
|
| - writer = csv.writer(f)
|
| - writer.writerow(data)
|
| -
|
| - def _RemoveItemFromManifest(self, uri):
|
| - # Remove the item from the dictionary since we're done with it and
|
| - # we don't want the dictionary to grow too large in memory for no good
|
| - # reason.
|
| - del self.items[uri]
|
| -
|
| -
|
| -class ItemExistsError(Exception):
|
| - """Exception class for objects that are skipped because they already exist."""
|
| - pass
|
| -
|
| -
|
| -def _GetPathBeforeFinalDir(uri):
|
| - """
|
| - Returns the part of the path before the final directory component for the
|
| - given URI, handling cases for file system directories, bucket, and bucket
|
| - subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket',
|
| - and for file://dir we'll return file://
|
| -
|
| - Args:
|
| - uri: StorageUri.
|
| -
|
| - Returns:
|
| - String name of above-described path, sans final path separator.
|
| - """
|
| - sep = uri.delim
|
| - # If the source uri argument had a wildcard and wasn't expanded by the
|
| - # shell, then uri.names_file() will always be true, so we check for
|
| - # this case explicitly.
|
| - assert ((not uri.names_file()) or ContainsWildcard(uri.object_name))
|
| - if uri.names_directory():
|
| - past_scheme = uri.uri[len('file://'):]
|
| - if past_scheme.find(sep) == -1:
|
| - return 'file://'
|
| - else:
|
| - return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0]
|
| - if uri.names_bucket():
|
| - return '%s://' % uri.scheme
|
| - # Else it names a bucket subdir.
|
| - return uri.uri.rstrip(sep).rpartition(sep)[0]
|
| -
|
| -def _HashFilename(filename):
|
| - """
|
| - Apply a hash function (SHA1) to shorten the passed file name. The spec
|
| - for the hashed file name is as follows:
|
| -
|
| - TRACKER_<hash>_<trailing>
|
| -
|
| - where hash is a SHA1 hash on the original file name and trailing is
|
| - the last 16 chars from the original file name. Max file name lengths
|
| - vary by operating system so the goal of this function is to ensure
|
| - the hashed version takes fewer than 100 characters.
|
| -
|
| - Args:
|
| - filename: file name to be hashed.
|
| -
|
| - Returns:
|
| - shorter, hashed version of passed file name
|
| - """
|
| - if isinstance(filename, unicode):
|
| - filename = filename.encode('utf-8')
|
| - else:
|
| - filename = unicode(filename, 'utf8').encode('utf-8')
|
| - m = hashlib.sha1(filename)
|
| - return "TRACKER_" + m.hexdigest() + '.' + filename[-16:]
|
| -
|
| -def _DivideAndCeil(dividend, divisor):
|
| - """Returns ceil(dividend / divisor), taking care to avoid the pitfalls of
|
| - floating point arithmetic that could otherwise yield the wrong result
|
| - for large numbers.
|
| - """
|
| - quotient = dividend // divisor
|
| - if (dividend % divisor) != 0:
|
| - quotient += 1
|
| - return quotient
|
| -
|
| -def _GetPartitionInfo(file_size, max_components, default_component_size):
|
| - """
|
| - Args:
|
| - file_size: The number of bytes in the file to be partitioned.
|
| - max_components: The maximum number of components that can be composed.
|
| - default_component_size: The size of a component, assuming that
|
| - max_components is infinite.
|
| - Returns:
|
| - The number of components in the partitioned file, and the size of each
|
| - component (except the last, which will have a different size iff
|
| - file_size != 0 (mod num_components)).
|
| - """
|
| - # num_components = ceil(file_size / default_component_size)
|
| - num_components = _DivideAndCeil(file_size, default_component_size)
|
| -
|
| - # num_components must be in the range [2, max_components]
|
| - num_components = max(min(num_components, max_components), 2)
|
| -
|
| - # component_size = ceil(file_size / num_components)
|
| - component_size = _DivideAndCeil(file_size, num_components)
|
| - return (num_components, component_size)
|
| -
|
| -def _DeleteKeyFn(cls, key):
|
| - """Wrapper function to be used with command.Apply()."""
|
| - return key.delete_key()
|
| -
|
| -def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock):
|
| - """Parse the tracker file (if any) from the last parallel composite upload
|
| - attempt. The tracker file is of the format described in
|
| - _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be
|
| - read, then the upload will start from the beginning.
|
| -
|
| - Args:
|
| - tracker_file: The name of the file to parse.
|
| - tracker_file_lock: Lock protecting access to the tracker file.
|
| -
|
| - Returns:
|
| - random_prefix: A randomly-generated prefix to the name of the
|
| - temporary components.
|
| - existing_objects: A list of ObjectFromTracker objects representing
|
| - the set of files that have already been uploaded.
|
| - """
|
| - existing_objects = []
|
| - try:
|
| - with tracker_file_lock:
|
| - f = open(tracker_file, 'r')
|
| - lines = f.readlines()
|
| - lines = [line.strip() for line in lines]
|
| - f.close()
|
| - except IOError as e:
|
| - # We can't read the tracker file, so generate a new random prefix.
|
| - lines = [str(random.randint(1, (10 ** 10) - 1))]
|
| -
|
| - # Ignore non-existent file (happens first time an upload
|
| - # is attempted on a file), but warn user for other errors.
|
| - if e.errno != errno.ENOENT:
|
| - # Will restart because we failed to read in the file.
|
| - print('Couldn\'t read parallel upload tracker file (%s): %s. '
|
| - 'Restarting upload from scratch.' % (tracker_file, e.strerror))
|
| -
|
| - # The first line contains the randomly-generated prefix.
|
| - random_prefix = lines[0]
|
| -
|
| - # The remaining lines were written in pairs to describe a single component
|
| - # in the form:
|
| - # object_name (without random prefix)
|
| - # generation
|
| - # Newlines are used as the delimiter because only newlines and carriage
|
| - # returns are invalid characters in object names, and users can specify
|
| - # a custom prefix in the config file.
|
| - i = 1
|
| - while i < len(lines):
|
| - (name, generation) = (lines[i], lines[i+1])
|
| - if generation == '':
|
| - generation = None
|
| - existing_objects.append(ObjectFromTracker(name, generation))
|
| - i += 2
|
| - return (random_prefix, existing_objects)
|
| -
|
| -def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component,
|
| - tracker_file_lock):
|
| - """Appends information about the uploaded component to the contents of an
|
| - existing tracker file, following the format described in
|
| - _CreateParallelUploadTrackerFile."""
|
| - lines = _GetParallelUploadTrackerFileLinesForComponents([component])
|
| - lines = [line + '\n' for line in lines]
|
| - with tracker_file_lock:
|
| - with open(tracker_file, 'a') as f:
|
| - f.writelines(lines)
|
| -
|
| -def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components,
|
| - tracker_file_lock):
|
| - """Writes information about components that were successfully uploaded so
|
| - that the upload can be resumed at a later date. The tracker file has
|
| - the format:
|
| - random_prefix
|
| - temp_object_1_name
|
| - temp_object_1_generation
|
| - .
|
| - .
|
| - .
|
| - temp_object_N_name
|
| - temp_object_N_generation
|
| - where N is the number of components that have been successfully uploaded.
|
| -
|
| - Args:
|
| - tracker_file: The name of the parallel upload tracker file.
|
| - random_prefix: The randomly-generated prefix that was used for
|
| - for uploading any existing components.
|
| - components: A list of ObjectFromTracker objects that were uploaded.
|
| - """
|
| - lines = [random_prefix]
|
| - lines += _GetParallelUploadTrackerFileLinesForComponents(components)
|
| - lines = [line + '\n' for line in lines]
|
| - with tracker_file_lock:
|
| - open(tracker_file, 'w').close() # Clear the file.
|
| - with open(tracker_file, 'w') as f:
|
| - f.writelines(lines)
|
| -
|
| -def _GetParallelUploadTrackerFileLinesForComponents(components):
|
| - """Return a list of the lines that should appear in the parallel composite
|
| - upload tracker file representing the given components, using the format
|
| - as described in _CreateParallelUploadTrackerFile."""
|
| - lines = []
|
| - for component in components:
|
| - generation = None
|
| - generation = component.generation
|
| - if not generation:
|
| - generation = ''
|
| - lines += [component.object_name, generation]
|
| - return lines
|
| -
|
| -def FilterExistingComponents(dst_args, existing_components,
|
| - bucket_name, suri_builder):
|
| - """Given the list of all target objects based on partitioning the file and
|
| - the list of objects that have already been uploaded successfully,
|
| - this function determines which objects should be uploaded, which
|
| - existing components are still valid, and which existing components should
|
| - be deleted.
|
| -
|
| - Args:
|
| - dst_args: The map of file_name -> PerformResumableUploadIfAppliesArgs
|
| - calculated by partitioning the file.
|
| - existing_components: A list of ObjectFromTracker objects that have been
|
| - uploaded in the past.
|
| - bucket_name: The name of the bucket in which the components exist.
|
| -
|
| - Returns:
|
| - components_to_upload: List of components that need to be uploaded.
|
| - uploaded_components: List of components that have already been
|
| - uploaded and are still valid.
|
| - existing_objects_to_delete: List of components that have already
|
| - been uploaded, but are no longer valid
|
| - and are in a versioned bucket, and
|
| - therefore should be deleted.
|
| - """
|
| - components_to_upload = []
|
| - existing_component_names = [component.object_name
|
| - for component in existing_components]
|
| - for component_name in dst_args:
|
| - if not (component_name in existing_component_names):
|
| - components_to_upload.append(dst_args[component_name])
|
| -
|
| - objects_already_chosen = []
|
| -
|
| - # Don't reuse any temporary components whose MD5 doesn't match the current
|
| - # MD5 of the corresponding part of the file. If the bucket is versioned,
|
| - # also make sure that we delete the existing temporary version.
|
| - existing_objects_to_delete = []
|
| - uploaded_components = []
|
| - for tracker_object in existing_components:
|
| - if ((not tracker_object.object_name in dst_args.keys())
|
| - or tracker_object.object_name in objects_already_chosen):
|
| - # This could happen if the component size has changed. This also serves
|
| - # to handle object names that get duplicated in the tracker file due
|
| - # to people doing things they shouldn't (e.g., overwriting an existing
|
| - # temporary component in a versioned bucket).
|
| -
|
| - uri = MakeGsUri(bucket_name, tracker_object.object_name, suri_builder)
|
| - uri.generation = tracker_object.generation
|
| - existing_objects_to_delete.append(uri)
|
| - continue
|
| -
|
| - dst_arg = dst_args[tracker_object.object_name]
|
| - file_part = FilePart(dst_arg.filename, dst_arg.file_start,
|
| - dst_arg.file_length)
|
| - # TODO: calculate MD5's in parallel when possible.
|
| - content_md5 = _CalculateMd5FromContents(file_part)
|
| -
|
| - try:
|
| - # Get the MD5 of the currently-existing component.
|
| - blr = BucketListingRef(dst_arg.dst_uri)
|
| - etag = blr.GetKey().etag
|
| - except Exception as e:
|
| - # We don't actually care what went wrong - we couldn't retrieve the
|
| - # object to check the MD5, so just upload it again.
|
| - etag = None
|
| - if etag != (('"%s"') % content_md5):
|
| - components_to_upload.append(dst_arg)
|
| - objects_already_chosen.append(tracker_object.object_name)
|
| - if tracker_object.generation:
|
| - # If the old object doesn't have a generation (i.e., it isn't in a
|
| - # versioned bucket), then we will just overwrite it anyway.
|
| - invalid_component_with_generation = copy.deepcopy(dst_arg.dst_uri)
|
| - invalid_component_with_generation.generation = tracker_object.generation
|
| - existing_objects_to_delete.append(invalid_component_with_generation)
|
| - else:
|
| - uri = copy.deepcopy(dst_arg.dst_uri)
|
| - uri.generation = tracker_object.generation
|
| - uploaded_components.append(uri)
|
| - objects_already_chosen.append(tracker_object.object_name)
|
| -
|
| - if uploaded_components:
|
| - logging.info(("Found %d existing temporary components to reuse.")
|
| - % len(uploaded_components))
|
| -
|
| - return (components_to_upload, uploaded_components,
|
| - existing_objects_to_delete)
|
| -
|
| -def MakeGsUri(bucket, filename, suri_builder):
|
| - """Returns a StorageUri for an object in GCS."""
|
| - return suri_builder.StorageUri(bucket + '/' + filename)
|
| -
|
| -def _CalculateMd5FromContents(file):
|
| - """Calculates the MD5 hash of the contents of a file.
|
| -
|
| - Args:
|
| - file: An already-open file object.
|
| - """
|
| - current_md5 = md5()
|
| - file.seek(0)
|
| - while True:
|
| - data = file.read(8192)
|
| - if not data:
|
| - break
|
| - current_md5.update(data)
|
| - file.seek(0)
|
| - return current_md5.hexdigest()
|
| + bucket = None
|
| + if exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket():
|
| + try:
|
| + bucket = self.gsutil_api.GetBucket(
|
| + exp_dst_url.bucket_name, provider=exp_dst_url.scheme,
|
| + fields=['versioning'])
|
| + except AccessDeniedException, e:
|
| + raise
|
| + except NotFoundException, e:
|
| + raise CommandException('Destination bucket %s does not exist.' %
|
| + exp_dst_url)
|
| + except Exception, e:
|
| + raise CommandException('Error retrieving destination bucket %s: %s' %
|
| + (exp_dst_url, e.message))
|
| + return bucket
|
|
|