Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(885)

Unified Diff: gslib/commands/cp.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « gslib/commands/cors.py ('k') | gslib/commands/defacl.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: gslib/commands/cp.py
===================================================================
--- gslib/commands/cp.py (revision 33376)
+++ gslib/commands/cp.py (working copy)
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
# Copyright 2011 Google Inc. All Rights Reserved.
# Copyright 2011, Nexenta Systems Inc.
#
@@ -12,155 +13,40 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+"""Implementation of Unix-like cp command for cloud storage providers."""
-# Get the system logging module, not our local logging module.
from __future__ import absolute_import
-import binascii
-import boto
-import copy
-import crcmod
-import csv
-import datetime
-import errno
-import gslib
-import gzip
-import hashlib
-import logging
-import mimetypes
-import mmap
-import multiprocessing
import os
-import platform
-import random
-import re
-import stat
-import subprocess
-import sys
-import tempfile
-import textwrap
-import threading
import time
import traceback
-from gslib.util import AddAcceptEncoding
-
-try:
- from hashlib import md5
-except ImportError:
- from md5 import md5
-
-from boto import config
-from boto.exception import GSResponseError
-from boto.exception import ResumableUploadException
-from boto.gs.resumable_upload_handler import ResumableUploadHandler
-from boto.s3.keyfile import KeyFile
-from boto.s3.resumable_download_handler import ResumableDownloadHandler
-from boto.storage_uri import BucketStorageUri
-from boto.storage_uri import StorageUri
-from collections import namedtuple
-from gslib.bucket_listing_ref import BucketListingRef
-from gslib.command import COMMAND_NAME
-from gslib.command import COMMAND_NAME_ALIASES
+from gslib import copy_helper
+from gslib.cat_helper import CatHelper
+from gslib.cloud_api import AccessDeniedException
+from gslib.cloud_api import NotFoundException
from gslib.command import Command
-from gslib.command import FILE_URIS_OK
-from gslib.command import MAX_ARGS
-from gslib.command import MIN_ARGS
-from gslib.command import PROVIDER_URIS_OK
-from gslib.command import SUPPORTED_SUB_ARGS
-from gslib.command import URIS_START_ARG
from gslib.commands.compose import MAX_COMPONENT_COUNT
-from gslib.commands.compose import MAX_COMPOSE_ARITY
-from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE
-from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD
+from gslib.copy_helper import CreateCopyHelperOpts
+from gslib.copy_helper import ItemExistsError
+from gslib.copy_helper import Manifest
+from gslib.copy_helper import PARALLEL_UPLOAD_TEMP_NAMESPACE
+from gslib.cs_api_map import ApiSelector
from gslib.exception import CommandException
-from gslib.file_part import FilePart
-from gslib.help_provider import HELP_NAME
-from gslib.help_provider import HELP_NAME_ALIASES
-from gslib.help_provider import HELP_ONE_LINE_SUMMARY
-from gslib.help_provider import HELP_TEXT
-from gslib.help_provider import HelpType
-from gslib.help_provider import HELP_TYPE
from gslib.name_expansion import NameExpansionIterator
-from gslib.util import BOTO_IS_SECURE
+from gslib.storage_url import ContainsWildcard
from gslib.util import CreateLock
-from gslib.util import CreateTrackerDirIfNeeded
-from gslib.util import GetConfigFilePath
-from gslib.util import ParseErrorDetail
-from gslib.util import HumanReadableToBytes
-from gslib.util import IS_WINDOWS
+from gslib.util import GetCloudApiInstance
+from gslib.util import IsCloudSubdirPlaceholder
from gslib.util import MakeHumanReadable
from gslib.util import NO_MAX
-from gslib.util import TWO_MB
-from gslib.util import UsingCrcmodExtension
-from gslib.wildcard_iterator import ContainsWildcard
-from gslib.name_expansion import NameExpansionResult
+from gslib.util import RemoveCRLFFromString
-
-SLOW_CRC_WARNING = """
-WARNING: Downloading this composite object requires integrity checking with
-CRC32c, but your crcmod installation isn't using the module's C extension, so
-the the hash computation will likely throttle download performance. For help
-installing the extension, please see:
- $ gsutil help crcmod
-To disable slow integrity checking, see the "check_hashes" option in your boto
-config file.
-"""
-
-SLOW_CRC_EXCEPTION = CommandException(
-"""
-Downloading this composite object requires integrity checking with CRC32c, but
-your crcmod installation isn't using the module's C extension, so the the hash
-computation will likely throttle download performance. For help installing the
-extension, please see:
- $ gsutil help crcmod
-To download regardless of crcmod performance or to skip slow integrity checks,
-see the "check_hashes" option in your boto config file.""")
-
-NO_HASH_CHECK_WARNING = """
-WARNING: This download will not be validated since your crcmod installation
-doesn't use the module's C extension, so the hash computation would likely
-throttle download performance. For help in installing the extension, please see:
- $ gsutil help crcmod
-To force integrity checking, see the "check_hashes" option in your boto config
-file.
-"""
-
-NO_SERVER_HASH_EXCEPTION = CommandException(
-"""
-This object has no server-supplied hash for performing integrity
-checks. To skip integrity checking for such objects, see the "check_hashes"
-option in your boto config file.""")
-
-NO_SERVER_HASH_WARNING = """
-WARNING: This object has no server-supplied hash for performing integrity
-checks. To force integrity checking, see the "check_hashes" option in your boto
-config file.
-"""
-
-PARALLEL_UPLOAD_TEMP_NAMESPACE = (
- u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/')
-
-PARALLEL_UPLOAD_STATIC_SALT = u"""
-PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS.
-The theory is that no user will have prepended this to the front of
-one of their object names and then done an MD5 hash of the name, and
-then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object
-name. Note that there will be no problems with object name length since we
-hash the original name.
-"""
-
-# In order to prevent people from uploading thousands of tiny files in parallel
-# (which, apart from being useless, is likely to cause them to be throttled
-# for the compose calls), don't allow files smaller than this to use parallel
-# composite uploads.
-MIN_PARALLEL_COMPOSITE_FILE_SIZE = 20971520 # 20 MB
-
SYNOPSIS_TEXT = """
<B>SYNOPSIS</B>
- gsutil cp [OPTION]... src_uri dst_uri
- gsutil cp [OPTION]... src_uri... dst_uri
- gsutil cp [OPTION]... -I dst_uri
+ gsutil cp [OPTION]... src_url dst_url
+ gsutil cp [OPTION]... src_url... dst_url
+ gsutil cp [OPTION]... -I dst_url
"""
DESCRIPTION_TEXT = """
@@ -186,14 +72,19 @@
gsutil -m cp -R dir gs://my_bucket
- You can pass a list of URIs to copy on STDIN instead of as command line
- arguments by using the -I option. This allows you to use gsutil in a
- pipeline to copy files and objects as generated by a program, such as:
+ You can pass a list of URLs (one per line) to copy on STDIN instead of as
+ command line arguments by using the -I option. This allows you to use gsutil
+ in a pipeline to upload or download files / objects as generated by a program,
+ such as:
some_program | gsutil -m cp -I gs://my_bucket
- The contents of STDIN can name files, cloud URIs, and wildcards of files
- and cloud URIs.
+ or:
+
+ some_program | gsutil -m cp -I ./download_dir
+
+ The contents of STDIN can name files, cloud URLs, and wildcards of files
+ and cloud URLs.
"""
NAME_CONSTRUCTION_TEXT = """
@@ -237,6 +128,15 @@
will create objects named like gs://my_bucket/subdir/dir2/a/b/c. In contrast,
if gs://my_bucket/subdir does not exist, this same gsutil cp command will
create objects named like gs://my_bucket/subdir/a/b/c.
+
+ Note: If you use the
+ `Google Developers Console <https://console.developers.google.com>`_
+ to create folders, it does so by creating a "placeholder" object that ends
+ with a "/" character. gsutil skips these objects when downloading from the
+ cloud to the local file system, because attempting to create a file that
+ ends with a "/" is not allowed on Linux and MacOS. Because of this, it is
+ recommended that you not create objects that end with "/" (unless you don't
+ need to be able to download such objects using gsutil).
"""
SUBDIRECTORIES_TEXT = """
@@ -282,11 +182,11 @@
COPY_IN_CLOUD_TEXT = """
<B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B>
- If both the source and destination URI are cloud URIs from the same
+ If both the source and destination URL are cloud URLs from the same
provider, gsutil copies data "in the cloud" (i.e., without downloading
to and uploading from the machine where you run gsutil). In addition to
the performance and cost advantages of doing this, copying in the cloud
- preserves metadata (like Content-Type and Cache-Control). In contrast,
+ preserves metadata (like Content-Type and Cache-Control). In contrast,
when you download data from the cloud it ends up in a file, which has
no associated metadata. Thus, unless you have some way to hold on to
or re-create that metadata, downloading to a file will not retain the
@@ -296,20 +196,63 @@
ACL to the new object, and instead will use the default bucket ACL (see
"gsutil help defacl"). You can override this behavior with the -p
option (see OPTIONS below).
+
+ One additional note about copying in the cloud: If the destination bucket has
+ versioning enabled, gsutil cp will copy all versions of the source object(s).
+ For example:
+
+ gsutil cp gs://bucket1/obj gs://bucket2
+
+ will cause all versions of gs://bucket1/obj to be copied to gs://bucket2.
"""
+FAILURE_HANDLING_TEXT = """
+<B>CHECKSUM VALIDATION AND FAILURE HANDLING</B>
+ At the end of every upload or download, the gsutil cp command validates that
+ that the checksum of the source file/object matches the checksum of the
+ destination file/object. If the checksums do not match, gsutil will delete
+ the invalid copy and print a warning message. This very rarely happens, but
+ if it does, please contact gs-team@google.com.
+
+ The cp command will retry when failures occur, but if enough failures happen
+ during a particular copy or delete operation the command will skip that object
+ and move on. At the end of the copy run if any failures were not successfully
+ retried, the cp command will report the count of failures, and exit with
+ non-zero status.
+
+ Note that there are cases where retrying will never succeed, such as if you
+ don't have write permission to the destination bucket or if the destination
+ path for some objects is longer than the maximum allowed length.
+
+ For more details about gsutil's retry handling, please see
+ "gsutil help retries".
+"""
+
RESUMABLE_TRANSFERS_TEXT = """
<B>RESUMABLE TRANSFERS</B>
- gsutil automatically uses the Google Cloud Storage resumable upload
- feature whenever you use the cp command to upload an object that is larger
- than 2 MB. You do not need to specify any special command line options
- to make this happen. If your upload is interrupted you can restart the
- upload by running the same cp command that you ran to start the upload.
+ gsutil automatically uses the Google Cloud Storage resumable upload feature
+ whenever you use the cp command to upload an object that is larger than 2
+ MB. You do not need to specify any special command line options to make this
+ happen. If your upload is interrupted you can restart the upload by running
+ the same cp command that you ran to start the upload. Until the upload
+ has completed successfully, it will not be visible at the destination object
+ and will not replace any existing object the upload is intended to overwrite.
+ (However, see the section on PARALLEL COMPOSITE UPLOADS, which may leave
+ temporary component objects in place during the upload process.)
Similarly, gsutil automatically performs resumable downloads (using HTTP
standard Range GET operations) whenever you use the cp command to download an
- object larger than 2 MB.
+ object larger than 2 MB. In this case the partially downloaded file will be
+ visible as soon as it starts being written. Thus, before you attempt to use
+ any files downloaded by gsutil you should make sure the download completed
+ successfully, by checking the exit status from the gsutil command. This can
+ be done in a bash script, for example, by doing:
+ gsutil cp gs://your-bucket/your-object ./local-file
+ if [ "$status" -ne "0" ] ; then
+ << Code that handles failures >>
+ fi
+
Resumable uploads and downloads store some state information in a file
in ~/.gsutil named by the destination object or file. If you attempt to
resume a transfer from a machine with a different directory, the transfer
@@ -321,7 +264,7 @@
STREAMING_TRANSFERS_TEXT = """
<B>STREAMING TRANSFERS</B>
- Use '-' in place of src_uri or dst_uri to perform a streaming
+ Use '-' in place of src_url or dst_url to perform a streaming
transfer. For example:
long_running_computation | gsutil cp - gs://my_bucket/obj
@@ -333,20 +276,29 @@
PARALLEL_COMPOSITE_UPLOADS_TEXT = """
<B>PARALLEL COMPOSITE UPLOADS</B>
- gsutil automatically uses `object composition <https://developers.google.com/storage/docs/composite-objects>`_
- to perform uploads in parallel for large, local files being uploaded to
- Google Cloud Storage. This means that, by default, a large file will be split
- into component pieces that will be uploaded in parallel. Those components will
- then be composed in the cloud, and the temporary components in the cloud will
- be deleted after successful composition. No additional local disk space is
- required for this operation.
+ gsutil can automatically use
+ `object composition <https://developers.google.com/storage/docs/composite-objects>`_
+ to perform uploads in parallel for large, local files being uploaded to Google
+ Cloud Storage. This means that, if enabled (see next paragraph), a large file
+ will be split into component pieces that will be uploaded in parallel. Those
+ components will then be composed in the cloud, and the temporary components in
+ the cloud will be deleted after successful composition. No additional local
+ disk space is required for this operation.
- Any file whose size exceeds the "parallel_composite_upload_threshold" config
- variable will trigger this feature by default. The ideal size of a
- component can also be set with the "parallel_composite_upload_component_size"
- config variable. See the .boto config file for details about how these values
- are used.
+ If the "parallel_composite_upload_threshold" config value is not 0 (which
+ disbles the feature), any file whose size exceeds the specified size will
+ trigger a parallel composite upload. Note that at present parallel composite
+ uploads are disabled by default, because using composite objects requires a
+ compiled crcmod (see "gsutil help crcmod"), and for operating systems that
+ don't already have this package installed this makes gsutil harder to use.
+ Google is actively working with a number of the Linux distributions to get
+ crcmod included with the stock distribution. Once that is done we will
+ re-enable parallel composite uploads by default in gsutil.
+ The ideal size of a component can also be set with the
+ "parallel_composite_upload_component_size" config variable. See the comments
+ in the .boto config file for details about how these values are used.
+
If the transfer fails prior to composition, running the command again will
take advantage of resumable uploads for those components that failed, and
the component objects will be deleted after the first successful attempt.
@@ -357,6 +309,22 @@
where <random ID> is some numerical value, and <hash> is an MD5 hash (not
related to the hash of the contents of the file or object).
+ To avoid leaving temporary objects around, you should make sure to check the
+ exit status from the gsutil command. This can be done in a bash script, for
+ example, by doing:
+
+ gsutil cp ./local-file gs://your-bucket/your-object
+ if [ "$status" -ne "0" ] ; then
+ << Code that handles failures >>
+ fi
+
+ Or, for copying a directory, use this instead:
+
+ gsutil cp -c -L cp.log -R ./dir gs://bucket
+ if [ "$status" -ne "0" ] ; then
+ << Code that handles failures >>
+ fi
+
One important caveat is that files uploaded in this fashion are still subject
to the maximum number of components limit. For example, if you upload a large
file that gets split into %d components, and try to compose it with another
@@ -373,6 +341,7 @@
""" % (PARALLEL_UPLOAD_TEMP_NAMESPACE, 10, MAX_COMPONENT_COUNT - 9,
MAX_COMPONENT_COUNT)
+
CHANGING_TEMP_DIRECTORIES_TEXT = """
<B>CHANGING TEMP DIRECTORIES</B>
gsutil writes data to a temporary directory in several cases:
@@ -405,214 +374,191 @@
OPTIONS_TEXT = """
<B>OPTIONS</B>
- -a canned_acl Sets named canned_acl when uploaded objects created. See
- 'gsutil help acls' for further details.
+ -a canned_acl Sets named canned_acl when uploaded objects created. See
+ 'gsutil help acls' for further details.
- -c If an error occurrs, continue to attempt to copy the remaining
- files. Note that this option is always true when running
- "gsutil -m cp".
+ -c If an error occurrs, continue to attempt to copy the remaining
+ files. If any copies were unsuccessful, gsutil's exit status
+ will be non-zero even if this flag is set. This option is
+ implicitly set when running "gsutil -m cp...". Note: -c only
+ applies to the actual copying operation. If an error occurs
+ while iterating over the files in the local directory (e.g.,
+ invalid Unicode file name) gsutil will print an error message
+ and abort.
- -D Copy in "daisy chain" mode, i.e., copying between two buckets by
- hooking a download to an upload, via the machine where gsutil is
- run. By default, data are copied between two buckets
- "in the cloud", i.e., without needing to copy via the machine
- where gsutil runs.
+ -D Copy in "daisy chain" mode, i.e., copying between two buckets
+ by hooking a download to an upload, via the machine where
+ gsutil is run. By default, data are copied between two buckets
+ "in the cloud", i.e., without needing to copy via the machine
+ where gsutil runs.
- By default, a "copy in the cloud" when the source is a composite
- object will retain the composite nature of the object. However,
- Daisy chain mode can be used to change a composite object into
- a non-composite object. For example:
+ By default, a "copy in the cloud" when the source is a
+ composite object will retain the composite nature of the
+ object. However, Daisy chain mode can be used to change a
+ composite object into a non-composite object. For example:
- gsutil cp -D -p gs://bucket/obj gs://bucket/obj_tmp
- gsutil mv -p gs://bucket/obj_tmp gs://bucket/obj
+ gsutil cp -D -p gs://bucket/obj gs://bucket/obj_tmp
+ gsutil mv -p gs://bucket/obj_tmp gs://bucket/obj
- Note: Daisy chain mode is automatically used when copying
- between providers (e.g., to copy data from Google Cloud Storage
- to another provider).
+ Note: Daisy chain mode is automatically used when copying
+ between providers (e.g., to copy data from Google Cloud Storage
+ to another provider).
- -e Exclude symlinks. When specified, symbolic links will not be
- copied.
+ -e Exclude symlinks. When specified, symbolic links will not be
+ copied.
- -L <file> Outputs a manifest log file with detailed information about each
- item that was copied. This manifest contains the following
- information for each item:
+ -I Causes gsutil to read the list of files or objects to copy from
+ stdin. This allows you to run a program that generates the list
+ of files to upload/download.
- - Source path.
- - Destination path.
- - Source size.
- - Bytes transferred.
- - MD5 hash.
- - UTC date and time transfer was started in ISO 8601 format.
- - UTC date and time transfer was completed in ISO 8601 format.
- - Upload id, if a resumable upload was performed.
- - Final result of the attempted transfer, success or failure.
- - Failure details, if any.
+ -L <file> Outputs a manifest log file with detailed information about
+ each item that was copied. This manifest contains the following
+ information for each item:
- If the log file already exists, gsutil will use the file as an
- input to the copy process, and will also append log items to the
- existing file. Files/objects that are marked in the existing log
- file as having been successfully copied (or skipped) will be
- ignored. Files/objects without entries will be copied and ones
- previously marked as unsuccessful will be retried. This can be
- used in conjunction with the -c option to build a script that
- copies a large number of objects reliably, using a bash script
- like the following:
+ - Source path.
+ - Destination path.
+ - Source size.
+ - Bytes transferred.
+ - MD5 hash.
+ - UTC date and time transfer was started in ISO 8601 format.
+ - UTC date and time transfer was completed in ISO 8601 format.
+ - Upload id, if a resumable upload was performed.
+ - Final result of the attempted transfer, success or failure.
+ - Failure details, if any.
- status=1
- while [ $status -ne 0 ] ; do
- gsutil cp -c -L cp.log -R ./dir gs://bucket
- status=$?
- done
+ If the log file already exists, gsutil will use the file as an
+ input to the copy process, and will also append log items to
+ the existing file. Files/objects that are marked in the
+ existing log file as having been successfully copied (or
+ skipped) will be ignored. Files/objects without entries will be
+ copied and ones previously marked as unsuccessful will be
+ retried. This can be used in conjunction with the -c option to
+ build a script that copies a large number of objects reliably,
+ using a bash script like the following:
- The -c option will cause copying to continue after failures
- occur, and the -L option will allow gsutil to pick up where it
- left off without duplicating work. The loop will continue
- running as long as gsutil exits with a non-zero status (such a
- status indicates there was at least one failure during the
- gsutil run).
+ until gsutil cp -c -L cp.log -R ./dir gs://bucket; do
+ sleep 1
+ done
- -n No-clobber. When specified, existing files or objects at the
- destination will not be overwritten. Any items that are skipped
- by this option will be reported as being skipped. This option
- will perform an additional HEAD request to check if an item
- exists before attempting to upload the data. This will save
- retransmitting data, but the additional HTTP requests may make
- small object transfers slower and more expensive.
+ The -c option will cause copying to continue after failures
+ occur, and the -L option will allow gsutil to pick up where it
+ left off without duplicating work. The loop will continue
+ running as long as gsutil exits with a non-zero status (such a
+ status indicates there was at least one failure during the
+ gsutil run).
- -p Causes source ACLs to be preserved when copying in the cloud.
- Note that this option has performance and cost implications,
- because it is essentially performing three requests ('acl get',
- cp, 'acl set'). (The performance issue can be mitigated to some
- degree by using gsutil -m cp to cause parallel copying.)
+ Note: If you're trying to synchronize the contents of a
+ directory and a bucket (or two buckets), see
+ 'gsutil help rsync'.
- You can avoid the additional performance and cost of using cp -p
- if you want all objects in the destination bucket to end up with
- the same ACL by setting a default ACL on that bucket instead of
- using cp -p. See "help gsutil defacl".
+ -n No-clobber. When specified, existing files or objects at the
+ destination will not be overwritten. Any items that are skipped
+ by this option will be reported as being skipped. This option
+ will perform an additional GET request to check if an item
+ exists before attempting to upload the data. This will save
+ retransmitting data, but the additional HTTP requests may make
+ small object transfers slower and more expensive.
- Note that it's not valid to specify both the -a and -p options
- together.
+ -p Causes ACLs to be preserved when copying in the cloud. Note
+ that this option has performance and cost implications when
+ using the XML API, as it requires separate HTTP calls for
+ interacting with ACLs. The performance issue can be mitigated
+ to some degree by using gsutil -m cp to cause parallel copying.
+ Also, this option only works if you have OWNER access to all of
+ the objects that are copied.
- -q Deprecated. Please use gsutil -q cp ... instead.
+ You can avoid the additional performance and cost of using
+ cp -p if you want all objects in the destination bucket to end
+ up with the same ACL by setting a default object ACL on that
+ bucket instead of using cp -p. See "help gsutil defacl".
- -R, -r Causes directories, buckets, and bucket subdirectories to be
- copied recursively. If you neglect to use this option for
- an upload, gsutil will copy any files it finds and skip any
- directories. Similarly, neglecting to specify -R for a download
- will cause gsutil to copy any objects at the current bucket
- directory level, and skip any subdirectories.
+ Note that it's not valid to specify both the -a and -p options
+ together.
- -v Requests that the version-specific URI for each uploaded object
- be printed. Given this URI you can make future upload requests
- that are safe in the face of concurrent updates, because Google
- Cloud Storage will refuse to perform the update if the current
- object version doesn't match the version-specific URI. See
- 'gsutil help versions' for more details.
+ -R, -r Causes directories, buckets, and bucket subdirectories to be
+ copied recursively. If you neglect to use this option for
+ an upload, gsutil will copy any files it finds and skip any
+ directories. Similarly, neglecting to specify -R for a download
+ will cause gsutil to copy any objects at the current bucket
+ directory level, and skip any subdirectories.
- -z <ext,...> Applies gzip content-encoding to file uploads with the given
- extensions. This is useful when uploading files with
- compressible content (such as .js, .css, or .html files) because
- it saves network bandwidth and space in Google Cloud Storage,
- which in turn reduces storage costs.
+ -v Requests that the version-specific URL for each uploaded object
+ be printed. Given this URL you can make future upload requests
+ that are safe in the face of concurrent updates, because Google
+ Cloud Storage will refuse to perform the update if the current
+ object version doesn't match the version-specific URL. See
+ 'gsutil help versions' for more details.
- When you specify the -z option, the data from your files is
- compressed before it is uploaded, but your actual files are left
- uncompressed on the local disk. The uploaded objects retain the
- Content-Type and name of the original files but are given a
- Content-Encoding header with the value "gzip" to indicate that
- the object data stored are compressed on the Google Cloud
- Storage servers.
+ -z <ext,...> Applies gzip content-encoding to file uploads with the given
+ extensions. This is useful when uploading files with
+ compressible content (such as .js, .css, or .html files)
+ because it saves network bandwidth and space in Google Cloud
+ Storage, which in turn reduces storage costs.
- For example, the following command:
+ When you specify the -z option, the data from your files is
+ compressed before it is uploaded, but your actual files are
+ left uncompressed on the local disk. The uploaded objects
+ retain the Content-Type and name of the original files but are
+ given a Content-Encoding header with the value "gzip" to
+ indicate that the object data stored are compressed on the
+ Google Cloud Storage servers.
- gsutil cp -z html -a public-read cattypes.html gs://mycats
+ For example, the following command:
- will do all of the following:
+ gsutil cp -z html -a public-read cattypes.html gs://mycats
- - Upload as the object gs://mycats/cattypes.html (cp command)
- - Set the Content-Type to text/html (based on file extension)
- - Compress the data in the file cattypes.html (-z option)
- - Set the Content-Encoding to gzip (-z option)
- - Set the ACL to public-read (-a option)
- - If a user tries to view cattypes.html in a browser, the
- browser will know to uncompress the data based on the
- Content-Encoding header, and to render it as HTML based on
- the Content-Type header.
+ will do all of the following:
+
+ - Upload as the object gs://mycats/cattypes.html (cp command)
+ - Set the Content-Type to text/html (based on file extension)
+ - Compress the data in the file cattypes.html (-z option)
+ - Set the Content-Encoding to gzip (-z option)
+ - Set the ACL to public-read (-a option)
+ - If a user tries to view cattypes.html in a browser, the
+ browser will know to uncompress the data based on the
+ Content-Encoding header, and to render it as HTML based on
+ the Content-Type header.
+
+ Note that if you download an object with Content-Encoding:gzip
+ gsutil will decompress the content before writing the local
+ file.
"""
-_detailed_help_text = '\n\n'.join([SYNOPSIS_TEXT,
+_DETAILED_HELP_TEXT = '\n\n'.join([SYNOPSIS_TEXT,
DESCRIPTION_TEXT,
NAME_CONSTRUCTION_TEXT,
SUBDIRECTORIES_TEXT,
COPY_IN_CLOUD_TEXT,
+ FAILURE_HANDLING_TEXT,
RESUMABLE_TRANSFERS_TEXT,
STREAMING_TRANSFERS_TEXT,
PARALLEL_COMPOSITE_UPLOADS_TEXT,
CHANGING_TEMP_DIRECTORIES_TEXT,
OPTIONS_TEXT])
-# This tuple is used only to encapsulate the arguments needed for
-# _PerformResumableUploadIfApplies, so that the arguments fit the model of
-# command.Apply().
-PerformResumableUploadIfAppliesArgs = namedtuple(
- 'PerformResumableUploadIfAppliesArgs',
- 'filename file_start file_length src_uri dst_uri canned_acl headers '
- 'tracker_file tracker_file_lock')
-ObjectFromTracker = namedtuple('ObjectFromTracker',
- 'object_name generation')
+CP_SUB_ARGS = 'a:cDeIL:MNnprRtvz:'
-CP_SUB_ARGS = 'a:cDeIL:MNnpqrRtvz:'
-# The maximum length of a file name can vary wildly between different
-# operating systems, so we always ensure that tracker files are less
-# than 100 characters in order to avoid any such issues.
-MAX_TRACKER_FILE_NAME_LENGTH = 100
+def _CopyFuncWrapper(cls, args, thread_state=None):
+ cls.CopyFunc(args, thread_state=thread_state)
-class TrackerFileType(object):
- UPLOAD = 1
- DOWNLOAD = 2
- PARALLEL_UPLOAD = 3
-
-def _CopyFuncWrapper(cls, args):
- cls._CopyFunc(args)
-
-def _PerformResumableUploadIfAppliesWrapper(cls, args):
- """A wrapper for cp._PerformResumableUploadIfApplies, which takes in a
- PerformResumableUploadIfAppliesArgs, extracts the arguments to form the
- arguments for the wrapped function, and then calls the wrapped function.
- This was designed specifically for use with command.Apply().
- """
- fp = FilePart(args.filename, args.file_start, args.file_length)
- with fp:
- already_split = True
- ret = cls._PerformResumableUploadIfApplies(
- fp, args.src_uri, args.dst_uri, args.canned_acl, args.headers,
- fp.length, already_split)
-
- # Update the tracker file after each call in order to be as robust as possible
- # against interrupts, failures, etc.
- component = ret[2]
- _AppendComponentTrackerToParallelUploadTrackerFile(args.tracker_file,
- component,
- args.tracker_file_lock)
- return ret
-
def _CopyExceptionHandler(cls, e):
"""Simple exception handler to allow post-completion status."""
cls.logger.error(str(e))
- cls.copy_failure_count += 1
- cls.logger.debug(('\n\nEncountered exception while copying:\n%s\n' %
- traceback.format_exc()))
+ cls.op_failure_count += 1
+ cls.logger.debug('\n\nEncountered exception while copying:\n%s\n',
+ traceback.format_exc())
+
def _RmExceptionHandler(cls, e):
"""Simple exception handler to allow post-completion status."""
cls.logger.error(str(e))
+
class CpCommand(Command):
- """
- Implementation of gsutil cp command.
+ """Implementation of gsutil cp command.
Note that CpCommand is run for both gsutil cp and gsutil mv. The latter
happens by MvCommand calling CpCommand and passing the hidden (undocumented)
@@ -631,1648 +577,243 @@
of processing the bucket listing iterator on the fly).
"""
- # TODO: Refactor this file to be less cumbersome. In particular, some of the
- # different paths (e.g., uploading a file to an object vs. downloading an
- # object to a file) could be split into separate files.
+ # Command specification. See base class for documentation.
+ command_spec = Command.CreateCommandSpec(
+ 'cp',
+ command_name_aliases=['copy'],
+ min_args=1,
+ max_args=NO_MAX,
+ # -t is deprecated but leave intact for now to avoid breakage.
+ supported_sub_args=CP_SUB_ARGS,
+ file_url_ok=True,
+ provider_url_ok=False,
+ urls_start_arg=0,
+ gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
+ gs_default_api=ApiSelector.JSON,
+ supported_private_args=['haltatbyte='],
+ )
+ # Help specification. See help_provider.py for documentation.
+ help_spec = Command.HelpSpec(
+ help_name='cp',
+ help_name_aliases=['copy'],
+ help_type='command_help',
+ help_one_line_summary='Copy files and objects',
+ help_text=_DETAILED_HELP_TEXT,
+ subcommand_help_text={},
+ )
- # Set default Content-Type type.
- DEFAULT_CONTENT_TYPE = 'application/octet-stream'
- USE_MAGICFILE = boto.config.getbool('GSUtil', 'use_magicfile', False)
- # Chunk size to use while unzipping gzip files.
- GUNZIP_CHUNK_SIZE = 8192
-
- # Command specification (processed by parent class).
- command_spec = {
- # Name of command.
- COMMAND_NAME : 'cp',
- # List of command name aliases.
- COMMAND_NAME_ALIASES : ['copy'],
- # Min number of args required by this command.
- MIN_ARGS : 1,
- # Max number of args required by this command, or NO_MAX.
- MAX_ARGS : NO_MAX,
- # Getopt-style string specifying acceptable sub args.
- # -t is deprecated but leave intact for now to avoid breakage.
- SUPPORTED_SUB_ARGS : CP_SUB_ARGS,
- # True if file URIs acceptable for this command.
- FILE_URIS_OK : True,
- # True if provider-only URIs acceptable for this command.
- PROVIDER_URIS_OK : False,
- # Index in args of first URI arg.
- URIS_START_ARG : 0,
- }
- help_spec = {
- # Name of command or auxiliary help info for which this help applies.
- HELP_NAME : 'cp',
- # List of help name aliases.
- HELP_NAME_ALIASES : ['copy'],
- # Type of help:
- HELP_TYPE : HelpType.COMMAND_HELP,
- # One line summary of this help.
- HELP_ONE_LINE_SUMMARY : 'Copy files and objects',
- # The full help text.
- HELP_TEXT : _detailed_help_text,
- }
-
- def _GetMD5FromETag(self, key):
- if not key.etag:
- return None
- possible_md5 = key.etag.strip('"\'').lower()
- if re.match(r'^[0-9a-f]{32}$', possible_md5):
- return binascii.a2b_hex(possible_md5)
-
- def _CheckHashes(self, key, file_name, hash_algs_to_compute,
- computed_hashes=None):
- """Validates integrity by comparing cloud digest to local digest.
-
- Args:
- key: Instance of boto Key object.
- file_name: Name of downloaded file on local disk.
- hash_algs_to_compute: Dictionary mapping hash algorithm names to digester
- objects.
- computed_hashes: If specified, use this dictionary mapping hash algorithm
- names to the calculated digest. If not specified, the
- key argument will be checked for local_digests property.
- If neither exist, the local file will be opened and
- digests calculated on-demand.
-
- Raises:
- CommandException: if cloud digests don't match local digests.
- """
- cloud_hashes = {}
- if hasattr(key, 'cloud_hashes'):
- cloud_hashes = key.cloud_hashes
- # Check for older-style MD5-based etag.
- etag_md5 = self._GetMD5FromETag(key)
- if etag_md5:
- cloud_hashes.setdefault('md5', etag_md5)
-
- local_hashes = {}
- # If we've already computed a valid local hash, use that, else calculate an
- # md5 or crc32c depending on what we have available to compare against.
- if computed_hashes:
- local_hashes = computed_hashes
- elif hasattr(key, 'local_hashes') and key.local_hashes:
- local_hashes = key.local_hashes
- elif 'md5' in cloud_hashes and 'md5' in hash_algs_to_compute:
- self.logger.info(
- 'Computing MD5 from scratch for resumed download')
-
- # Open file in binary mode to avoid surprises in Windows.
- with open(file_name, 'rb') as fp:
- local_hashes['md5'] = binascii.a2b_hex(key.compute_md5(fp)[0])
- elif 'crc32c' in cloud_hashes and 'crc32c' in hash_algs_to_compute:
- self.logger.info(
- 'Computing CRC32C from scratch for resumed download')
-
- # Open file in binary mode to avoid surprises in Windows.
- with open(file_name, 'rb') as fp:
- crc32c_alg = lambda: crcmod.predefined.Crc('crc-32c')
- crc32c_hex = key.compute_hash(
- fp, algorithm=crc32c_alg)[0]
- local_hashes['crc32c'] = binascii.a2b_hex(crc32c_hex)
-
- for alg in local_hashes:
- if alg not in cloud_hashes:
- continue
- local_hexdigest = binascii.b2a_hex(local_hashes[alg])
- cloud_hexdigest = binascii.b2a_hex(cloud_hashes[alg])
- self.logger.debug('Comparing local vs cloud %s-checksum. (%s/%s)' % (
- alg, local_hexdigest, cloud_hexdigest))
- if local_hexdigest != cloud_hexdigest:
- raise CommandException(
- '%s signature computed for local file (%s) doesn\'t match '
- 'cloud-supplied digest (%s). Local file (%s) deleted.' % (
- alg, local_hexdigest, cloud_hexdigest, file_name))
-
- def _CheckForDirFileConflict(self, exp_src_uri, dst_uri):
- """Checks whether copying exp_src_uri into dst_uri is not possible.
-
- This happens if a directory exists in local file system where a file
- needs to go or vice versa. In that case we print an error message and
- exits. Example: if the file "./x" exists and you try to do:
- gsutil cp gs://mybucket/x/y .
- the request can't succeed because it requires a directory where
- the file x exists.
-
- Note that we don't enforce any corresponding restrictions for buckets,
- because the flat namespace semantics for buckets doesn't prohibit such
- cases the way hierarchical file systems do. For example, if a bucket
- contains an object called gs://bucket/dir and then you run the command:
- gsutil cp file1 file2 gs://bucket/dir
- you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and
- gs://bucket/dir/file2.
-
- Args:
- exp_src_uri: Expanded source StorageUri of copy.
- dst_uri: Destination URI.
-
- Raises:
- CommandException: if errors encountered.
- """
- if dst_uri.is_cloud_uri():
- # The problem can only happen for file destination URIs.
- return
- dst_path = dst_uri.object_name
- final_dir = os.path.dirname(dst_path)
- if os.path.isfile(final_dir):
- raise CommandException('Cannot retrieve %s because a file exists '
- 'where a directory needs to be created (%s).' %
- (exp_src_uri, final_dir))
- if os.path.isdir(dst_path):
- raise CommandException('Cannot retrieve %s because a directory exists '
- '(%s) where the file needs to be created.' %
- (exp_src_uri, dst_path))
-
- def _InsistDstUriNamesContainer(self, exp_dst_uri,
- have_existing_dst_container, command_name):
- """
- Raises an exception if URI doesn't name a directory, bucket, or bucket
- subdir, with special exception for cp -R (see comments below).
-
- Args:
- exp_dst_uri: Wildcard-expanding dst_uri.
- have_existing_dst_container: bool indicator of whether exp_dst_uri
- names a container (directory, bucket, or existing bucket subdir).
- command_name: Name of command making call. May not be the same as
- self.command_name in the case of commands implemented atop other
- commands (like mv command).
-
- Raises:
- CommandException: if the URI being checked does not name a container.
- """
- if exp_dst_uri.is_file_uri():
- ok = exp_dst_uri.names_directory()
- else:
- if have_existing_dst_container:
- ok = True
- else:
- # It's ok to specify a non-existing bucket subdir, for example:
- # gsutil cp -R dir gs://bucket/abc
- # where gs://bucket/abc isn't an existing subdir.
- ok = exp_dst_uri.names_object()
- if not ok:
- raise CommandException('Destination URI must name a directory, bucket, '
- 'or bucket\nsubdirectory for the multiple '
- 'source form of the %s command.' % command_name)
-
- class _FileCopyCallbackHandler(object):
- """Outputs progress info for large copy requests."""
-
- def __init__(self, upload, logger):
- if upload:
- self.announce_text = 'Uploading'
- else:
- self.announce_text = 'Downloading'
- self.logger = logger
-
- def call(self, total_bytes_transferred, total_size):
- # Use sys.stderr.write instead of self.logger.info so progress messages
- # output on a single continuously overwriting line.
- if self.logger.isEnabledFor(logging.INFO):
- sys.stderr.write('%s: %s/%s \r' % (
- self.announce_text,
- MakeHumanReadable(total_bytes_transferred),
- MakeHumanReadable(total_size)))
- if total_bytes_transferred == total_size:
- sys.stderr.write('\n')
-
- class _StreamCopyCallbackHandler(object):
- """Outputs progress info for Stream copy to cloud.
- Total Size of the stream is not known, so we output
- only the bytes transferred.
- """
-
- def __init__(self, logger):
- self.logger = logger
-
- def call(self, total_bytes_transferred, total_size):
- # Use sys.stderr.write instead of self.logger.info so progress messages
- # output on a single continuously overwriting line.
- if self.logger.isEnabledFor(logging.INFO):
- sys.stderr.write('Uploading: %s \r' %
- MakeHumanReadable(total_bytes_transferred))
- if total_size and total_bytes_transferred == total_size:
- sys.stderr.write('\n')
-
- def _GetTransferHandlers(self, dst_uri, size, upload):
- """
- Selects upload/download and callback handlers.
-
- We use a callback handler that shows a simple textual progress indicator
- if size is above the configurable threshold.
-
- We use a resumable transfer handler if size is >= the configurable
- threshold and resumable transfers are supported by the given provider.
- boto supports resumable downloads for all providers, but resumable
- uploads are currently only supported by GS.
-
- Args:
- dst_uri: the destination URI.
- size: size of file (object) being uploaded (downloaded).
- upload: bool indication of whether transfer is an upload.
- """
- config = boto.config
- resumable_threshold = config.getint('GSUtil', 'resumable_threshold', TWO_MB)
- transfer_handler = None
- cb = None
- num_cb = None
-
- # Checks whether the destination file is a "special" file, like /dev/null on
- # Linux platforms or null on Windows platforms, so we can disable resumable
- # download support since the file size of the destination won't ever be
- # correct.
- dst_is_special = False
- if dst_uri.is_file_uri():
- # Check explicitly first because os.stat doesn't work on 'nul' in Windows.
- if dst_uri.object_name == os.devnull:
- dst_is_special = True
- try:
- mode = os.stat(dst_uri.object_name).st_mode
- if stat.S_ISCHR(mode):
- dst_is_special = True
- except OSError:
- pass
-
- if size >= resumable_threshold and not dst_is_special:
- cb = self._FileCopyCallbackHandler(upload, self.logger).call
- num_cb = int(size / TWO_MB)
-
- if upload:
- tracker_file_type = TrackerFileType.UPLOAD
- else:
- tracker_file_type = TrackerFileType.DOWNLOAD
- tracker_file = self._GetTrackerFilePath(dst_uri, tracker_file_type)
-
- if upload:
- if dst_uri.scheme == 'gs':
- is_secure = BOTO_IS_SECURE
- if not is_secure[0]:
- self.logger.info('\n'.join(textwrap.wrap(
- 'WARNING: Your boto config file (%s) has is_secure set to '
- 'False. Resumable uploads are not secure when performed with '
- 'this configuration, so large files are being uploaded with '
- 'non-resumable uploads instead.' % GetConfigFilePath())))
- else:
- transfer_handler = ResumableUploadHandler(tracker_file)
- else:
- transfer_handler = ResumableDownloadHandler(tracker_file)
-
- return (cb, num_cb, transfer_handler)
-
- def _GetTrackerFilePath(self, dst_uri, tracker_file_type, src_uri=None):
- resumable_tracker_dir = CreateTrackerDirIfNeeded()
- if tracker_file_type == TrackerFileType.UPLOAD:
- # Encode the dest bucket and object name into the tracker file name.
- res_tracker_file_name = (
- re.sub('[/\\\\]', '_', 'resumable_upload__%s__%s.url' %
- (dst_uri.bucket_name, dst_uri.object_name)))
- tracker_file_type_str = "upload"
- elif tracker_file_type == TrackerFileType.DOWNLOAD:
- # Encode the fully-qualified dest file name into the tracker file name.
- res_tracker_file_name = (
- re.sub('[/\\\\]', '_', 'resumable_download__%s.etag' %
- (os.path.realpath(dst_uri.object_name))))
- tracker_file_type_str = "download"
- elif tracker_file_type == TrackerFileType.PARALLEL_UPLOAD:
- # Encode the dest bucket and object names as well as the source file name
- # into the tracker file name.
- res_tracker_file_name = (
- re.sub('[/\\\\]', '_', 'parallel_upload__%s__%s__%s.url' %
- (dst_uri.bucket_name, dst_uri.object_name, src_uri)))
- tracker_file_type_str = "parallel_upload"
-
- res_tracker_file_name = _HashFilename(res_tracker_file_name)
- tracker_file_name = '%s_%s' % (tracker_file_type_str, res_tracker_file_name)
- tracker_file_path = '%s%s%s' % (resumable_tracker_dir, os.sep,
- tracker_file_name)
- assert(len(tracker_file_name) < MAX_TRACKER_FILE_NAME_LENGTH)
- return tracker_file_path
-
- def _LogCopyOperation(self, src_uri, dst_uri, headers):
- """
- Logs copy operation being performed, including Content-Type if appropriate.
- """
- if 'content-type' in headers and dst_uri.is_cloud_uri():
- content_type_msg = ' [Content-Type=%s]' % headers['content-type']
- else:
- content_type_msg = ''
- if src_uri.is_stream():
- self.logger.info('Copying from <STDIN>%s...', content_type_msg)
- else:
- self.logger.info('Copying %s%s...', src_uri, content_type_msg)
-
- def _ProcessCopyObjectToObjectOptions(self, dst_uri, headers):
- """
- Common option processing between _CopyObjToObjInTheCloud and
- _CopyObjToObjDaisyChainMode.
- """
- preserve_acl = False
- canned_acl = None
- if self.sub_opts:
- for o, a in self.sub_opts:
- if o == '-a':
- canned_acls = dst_uri.canned_acls()
- if a not in canned_acls:
- raise CommandException('Invalid canned ACL "%s".' % a)
- canned_acl = a
- headers[dst_uri.get_provider().acl_header] = canned_acl
- if o == '-p':
- preserve_acl = True
- if preserve_acl and canned_acl:
- raise CommandException(
- 'Specifying both the -p and -a options together is invalid.')
- return (preserve_acl, canned_acl, headers)
-
- # We pass the headers explicitly to this call instead of using self.headers
- # so we can set different metadata (like Content-Type type) for each object.
- def _CopyObjToObjInTheCloud(self, src_key, src_uri, dst_uri, headers):
- """Performs copy-in-the cloud from specified src to dest object.
-
- Args:
- src_key: Source Key.
- src_uri: Source StorageUri.
- dst_uri: Destination StorageUri.
- headers: A copy of the top-level headers dictionary.
-
- Returns:
- (elapsed_time, bytes_transferred, dst_uri) excluding overhead like initial
- HEAD. Note: At present copy-in-the-cloud doesn't return the generation of
- the created object, so the returned URI is actually not version-specific
- (unlike other cp cases).
-
- Raises:
- CommandException: if errors encountered.
- """
- self._SetContentTypeHeader(src_uri, headers)
- self._LogCopyOperation(src_uri, dst_uri, headers)
- # Do Object -> object copy within same provider (uses
- # x-<provider>-copy-source metadata HTTP header to request copying at the
- # server).
- src_bucket = src_uri.get_bucket(False, headers)
- (preserve_acl, canned_acl, headers) = (
- self._ProcessCopyObjectToObjectOptions(dst_uri, headers))
- start_time = time.time()
- # Pass headers in headers param not metadata param, so boto will copy
- # existing key's metadata and just set the additional headers specified
- # in the headers param (rather than using the headers to override existing
- # metadata). In particular this allows us to copy the existing key's
- # Content-Type and other metadata users need while still being able to
- # set headers the API needs (like x-goog-project-id). Note that this means
- # you can't do something like:
- # gsutil cp -t Content-Type text/html gs://bucket/* gs://bucket2
- # to change the Content-Type while copying.
- dst_key = dst_uri.copy_key(
- src_bucket.name, src_uri.object_name, preserve_acl=preserve_acl,
- headers=headers, src_version_id=src_uri.version_id,
- src_generation=src_uri.generation)
-
- end_time = time.time()
- return (end_time - start_time, src_key.size,
- dst_uri.clone_replace_key(dst_key))
-
- def _CheckFreeSpace(self, path):
- """Return path/drive free space (in bytes)."""
- if platform.system() == 'Windows':
- from ctypes import c_int, c_uint64, c_wchar_p, windll, POINTER, WINFUNCTYPE, WinError
- try:
- GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_wchar_p, POINTER(c_uint64),
- POINTER(c_uint64), POINTER(c_uint64))
- GetDiskFreeSpaceEx = GetDiskFreeSpaceEx(
- ('GetDiskFreeSpaceExW', windll.kernel32), (
- (1, 'lpszPathName'),
- (2, 'lpFreeUserSpace'),
- (2, 'lpTotalSpace'),
- (2, 'lpFreeSpace'),))
- except AttributeError:
- GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_char_p, POINTER(c_uint64),
- POINTER(c_uint64), POINTER(c_uint64))
- GetDiskFreeSpaceEx = GetDiskFreeSpaceEx(
- ('GetDiskFreeSpaceExA', windll.kernel32), (
- (1, 'lpszPathName'),
- (2, 'lpFreeUserSpace'),
- (2, 'lpTotalSpace'),
- (2, 'lpFreeSpace'),))
-
- def GetDiskFreeSpaceEx_errcheck(result, func, args):
- if not result:
- raise WinError()
- return args[1].value
- GetDiskFreeSpaceEx.errcheck = GetDiskFreeSpaceEx_errcheck
-
- return GetDiskFreeSpaceEx(os.getenv('SystemDrive'))
- else:
- (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path)
- return f_frsize * f_bavail
-
- def _PerformResumableUploadIfApplies(self, fp, src_uri, dst_uri, canned_acl,
- headers, file_size, already_split=False):
- """
- Performs resumable upload if supported by provider and file is above
- threshold, else performs non-resumable upload.
-
- Returns (elapsed_time, bytes_transferred, version-specific dst_uri).
- """
- start_time = time.time()
- (cb, num_cb, res_upload_handler) = self._GetTransferHandlers(
- dst_uri, file_size, True)
- if dst_uri.scheme == 'gs':
- # Resumable upload protocol is Google Cloud Storage-specific.
- dst_uri.set_contents_from_file(fp, headers, policy=canned_acl,
- cb=cb, num_cb=num_cb,
- res_upload_handler=res_upload_handler)
- else:
- dst_uri.set_contents_from_file(fp, headers, policy=canned_acl,
- cb=cb, num_cb=num_cb)
- if res_upload_handler:
- # ResumableUploadHandler does not update upload_start_point from its
- # initial value of -1 if transferring the whole file, so clamp at 0
- bytes_transferred = file_size - max(
- res_upload_handler.upload_start_point, 0)
- if self.use_manifest and not already_split:
- # Save the upload indentifier in the manifest file, unless we're
- # uploading a temporary component for parallel composite uploads.
- self.manifest.Set(
- src_uri, 'upload_id', res_upload_handler.get_upload_id())
- else:
- bytes_transferred = file_size
- end_time = time.time()
- return (end_time - start_time, bytes_transferred, dst_uri)
-
- def _PerformStreamingUpload(self, fp, dst_uri, headers, canned_acl=None):
- """
- Performs a streaming upload to the cloud.
-
- Args:
- fp: The file whose contents to upload.
- dst_uri: Destination StorageUri.
- headers: A copy of the top-level headers dictionary.
- canned_acl: Optional canned ACL to set on the object.
-
- Returns (elapsed_time, bytes_transferred, version-specific dst_uri).
- """
- start_time = time.time()
-
- cb = self._StreamCopyCallbackHandler(self.logger).call
- dst_uri.set_contents_from_stream(
- fp, headers, policy=canned_acl, cb=cb)
- try:
- bytes_transferred = fp.tell()
- except:
- bytes_transferred = 0
-
- end_time = time.time()
- return (end_time - start_time, bytes_transferred, dst_uri)
-
- def _SetContentTypeHeader(self, src_uri, headers):
- """
- Sets content type header to value specified in '-h Content-Type' option (if
- specified); else sets using Content-Type detection.
- """
- if 'content-type' in headers:
- # If empty string specified (i.e., -h "Content-Type:") set header to None,
- # which will inhibit boto from sending the CT header. Otherwise, boto will
- # pass through the user specified CT header.
- if not headers['content-type']:
- headers['content-type'] = None
- # else we'll keep the value passed in via -h option (not performing
- # content type detection).
- else:
- # Only do content type recognition is src_uri is a file. Object-to-object
- # copies with no -h Content-Type specified re-use the content type of the
- # source object.
- if src_uri.is_file_uri():
- object_name = src_uri.object_name
- content_type = None
- # Streams (denoted by '-') are expected to be 'application/octet-stream'
- # and 'file' would partially consume them.
- if object_name != '-':
- if self.USE_MAGICFILE:
- p = subprocess.Popen(['file', '--mime-type', object_name],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- output, error = p.communicate()
- if p.returncode != 0 or error:
- raise CommandException(
- 'Encountered error running "file --mime-type %s" '
- '(returncode=%d).\n%s' % (object_name, p.returncode, error))
- # Parse output by removing line delimiter and splitting on last ":
- content_type = output.rstrip().rpartition(': ')[2]
- else:
- content_type = mimetypes.guess_type(object_name)[0]
- if not content_type:
- content_type = self.DEFAULT_CONTENT_TYPE
- headers['content-type'] = content_type
-
- def _GetFileSize(self, fp):
- """Determines file size different ways for case where fp is actually a
- wrapper around a Key vs an actual file.
-
- Args:
- The file whose size we wish to determine.
-
- Returns:
- The size of the file, in bytes.
- """
- if isinstance(fp, KeyFile):
- return fp.getkey().size
- else:
- return os.path.getsize(fp.name)
-
- def _UploadFileToObject(self, src_key, src_uri, dst_uri, headers,
- should_log=True, allow_splitting=True):
- """Uploads a local file to an object.
-
- Args:
- src_key: Source StorageUri. Must be a file URI.
- src_uri: Source StorageUri.
- dst_uri: Destination StorageUri.
- headers: The headers dictionary.
- should_log: bool indicator whether we should log this operation.
- Returns:
- (elapsed_time, bytes_transferred, version-specific dst_uri), excluding
- overhead like initial HEAD.
-
- Raises:
- CommandException: if errors encountered.
- """
- gzip_exts = []
- canned_acl = None
- if self.sub_opts:
- for o, a in self.sub_opts:
- if o == '-a':
- canned_acls = dst_uri.canned_acls()
- if a not in canned_acls:
- raise CommandException('Invalid canned ACL "%s".' % a)
- canned_acl = a
- elif o == '-t':
- self.logger.warning(
- 'Warning: -t is deprecated, and will be removed in the future. '
- 'Content type\ndetection is '
- 'now performed by default, unless inhibited by specifying '
- 'a\nContent-Type header via the -h option.')
- elif o == '-z':
- gzip_exts = a.split(',')
-
- self._SetContentTypeHeader(src_uri, headers)
- if should_log:
- self._LogCopyOperation(src_uri, dst_uri, headers)
-
- if 'content-language' not in headers:
- content_language = config.get_value('GSUtil', 'content_language')
- if content_language:
- headers['content-language'] = content_language
-
- fname_parts = src_uri.object_name.split('.')
- if len(fname_parts) > 1 and fname_parts[-1] in gzip_exts:
- self.logger.debug('Compressing %s (to tmp)...', src_key)
- (gzip_fh, gzip_path) = tempfile.mkstemp()
- gzip_fp = None
- try:
- # Check for temp space. Assume the compressed object is at most 2x
- # the size of the object (normally should compress to smaller than
- # the object)
- if (self._CheckFreeSpace(gzip_path)
- < 2*int(os.path.getsize(src_key.name))):
- raise CommandException('Inadequate temp space available to compress '
- '%s' % src_key.name)
- gzip_fp = gzip.open(gzip_path, 'wb')
- gzip_fp.writelines(src_key.fp)
- finally:
- if gzip_fp:
- gzip_fp.close()
- os.close(gzip_fh)
- headers['content-encoding'] = 'gzip'
- gzip_fp = open(gzip_path, 'rb')
- try:
- file_size = self._GetFileSize(gzip_fp)
- (elapsed_time, bytes_transferred, result_uri) = (
- self._PerformResumableUploadIfApplies(gzip_fp, src_uri, dst_uri,
- canned_acl, headers,
- file_size))
- finally:
- gzip_fp.close()
- try:
- os.unlink(gzip_path)
- # Windows sometimes complains the temp file is locked when you try to
- # delete it.
- except Exception, e:
- pass
- elif (src_key.is_stream()
- and dst_uri.get_provider().supports_chunked_transfer()):
- (elapsed_time, bytes_transferred, result_uri) = (
- self._PerformStreamingUpload(src_key.fp, dst_uri, headers,
- canned_acl))
- else:
- if src_key.is_stream():
- # For Providers that don't support chunked Transfers
- tmp = tempfile.NamedTemporaryFile()
- file_uri = self.suri_builder.StorageUri('file://%s' % tmp.name)
- try:
- file_uri.new_key(False, headers).set_contents_from_file(
- src_key.fp, headers)
- src_key = file_uri.get_key()
- finally:
- file_uri.close()
- try:
- fp = src_key.fp
- file_size = self._GetFileSize(fp)
- if self._ShouldDoParallelCompositeUpload(allow_splitting, src_key,
- dst_uri, file_size):
- (elapsed_time, bytes_transferred, result_uri) = (
- self._DoParallelCompositeUpload(fp, src_uri, dst_uri, headers,
- canned_acl, file_size))
- else:
- (elapsed_time, bytes_transferred, result_uri) = (
- self._PerformResumableUploadIfApplies(
- src_key.fp, src_uri, dst_uri, canned_acl, headers,
- self._GetFileSize(src_key.fp)))
- finally:
- if src_key.is_stream():
- tmp.close()
- else:
- src_key.close()
- return (elapsed_time, bytes_transferred, result_uri)
-
- def _GetHashAlgs(self, key):
- hash_algs = {}
- check_hashes_config = config.get(
- 'GSUtil', 'check_hashes', 'if_fast_else_fail')
- if check_hashes_config == 'never':
- return hash_algs
- if self._GetMD5FromETag(key):
- hash_algs['md5'] = md5
- if hasattr(key, 'cloud_hashes') and key.cloud_hashes:
- if 'md5' in key.cloud_hashes:
- hash_algs['md5'] = md5
- # If the cloud provider supplies a CRC, we'll compute a checksum to
- # validate if we're using a native crcmod installation or MD5 isn't
- # offered as an alternative.
- if 'crc32c' in key.cloud_hashes:
- if UsingCrcmodExtension(crcmod):
- hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
- elif not hash_algs:
- if check_hashes_config == 'if_fast_else_fail':
- raise SLOW_CRC_EXCEPTION
- elif check_hashes_config == 'if_fast_else_skip':
- sys.stderr.write(NO_HASH_CHECK_WARNING)
- elif check_hashes_config == 'always':
- sys.stderr.write(SLOW_CRC_WARNING)
- hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
- else:
- raise CommandException(
- 'Your boto config \'check_hashes\' option is misconfigured.')
- elif not hash_algs:
- if check_hashes_config == 'if_fast_else_skip':
- sys.stderr.write(NO_SERVER_HASH_WARNING)
- else:
- raise NO_SERVER_HASH_EXCEPTION
- return hash_algs
-
- def _DownloadObjectToFile(self, src_key, src_uri, dst_uri, headers,
- should_log=True):
- """Downloads an object to a local file.
-
- Args:
- src_key: Source Key.
- src_uri: Source StorageUri.
- dst_uri: Destination StorageUri.
- headers: The headers dictionary.
- should_log: bool indicator whether we should log this operation.
- Returns:
- (elapsed_time, bytes_transferred, dst_uri), excluding overhead like
- initial HEAD.
-
- Raises:
- CommandException: if errors encountered.
- """
- if should_log:
- self._LogCopyOperation(src_uri, dst_uri, headers)
- (cb, num_cb, res_download_handler) = self._GetTransferHandlers(
- dst_uri, src_key.size, False)
- file_name = dst_uri.object_name
- dir_name = os.path.dirname(file_name)
- if dir_name and not os.path.exists(dir_name):
- # Do dir creation in try block so can ignore case where dir already
- # exists. This is needed to avoid a race condition when running gsutil
- # -m cp.
- try:
- os.makedirs(dir_name)
- except OSError, e:
- if e.errno != errno.EEXIST:
- raise
- # For gzipped objects, download to a temp file and unzip.
- if (hasattr(src_key, 'content_encoding')
- and src_key.content_encoding == 'gzip'):
- # We can't use tempfile.mkstemp() here because we need a predictable
- # filename for resumable downloads.
- download_file_name = '%s_.gztmp' % file_name
- need_to_unzip = True
- else:
- download_file_name = file_name
- need_to_unzip = False
-
- hash_algs = self._GetHashAlgs(src_key)
-
- # Add accept encoding for download operation.
- AddAcceptEncoding(headers)
-
- fp = None
- try:
- if res_download_handler:
- fp = open(download_file_name, 'ab')
- else:
- fp = open(download_file_name, 'wb')
- start_time = time.time()
- # Use our hash_algs if get_contents_to_file() will accept them, else the
- # default (md5-only) will suffice.
- try:
- src_key.get_contents_to_file(fp, headers, cb=cb, num_cb=num_cb,
- res_download_handler=res_download_handler,
- hash_algs=hash_algs)
- except TypeError:
- src_key.get_contents_to_file(fp, headers, cb=cb, num_cb=num_cb,
- res_download_handler=res_download_handler)
-
- # If a custom test method is defined, call it here. For the copy command,
- # test methods are expected to take one argument: an open file pointer,
- # and are used to perturb the open file during download to exercise
- # download error detection.
- if self.test_method:
- self.test_method(fp)
- end_time = time.time()
- finally:
- if fp:
- fp.close()
-
- if (not need_to_unzip and
- hasattr(src_key, 'content_encoding')
- and src_key.content_encoding == 'gzip'):
- # TODO: HEAD requests are currently not returning proper Content-Encoding
- # headers when an object is gzip-encoded on-the-fly. Remove this once
- # it's fixed.
- renamed_file_name = '%s_.gztmp' % file_name
- os.rename(download_file_name, renamed_file_name)
- download_file_name = renamed_file_name
- need_to_unzip = True
-
- # Discard all hashes if we are resuming a partial download.
- if res_download_handler and res_download_handler.download_start_point:
- src_key.local_hashes = {}
-
- # Verify downloaded file checksum matched source object's checksum.
- digest_verified = True
- computed_hashes = None
- try:
- self._CheckHashes(src_key, download_file_name, hash_algs)
- except CommandException, e:
- # If the digest doesn't match, we'll try checking it again after
- # unzipping.
- if (not need_to_unzip or
- 'doesn\'t match cloud-supplied digest' not in str(e)):
- os.unlink(download_file_name)
- raise
- digest_verified = False
- computed_hashes = dict(
- (alg, digester())
- for alg, digester in self._GetHashAlgs(src_key).iteritems())
-
- if res_download_handler:
- bytes_transferred = (
- src_key.size - res_download_handler.download_start_point)
- else:
- bytes_transferred = src_key.size
-
- if need_to_unzip:
- # Log that we're uncompressing if the file is big enough that
- # decompressing would make it look like the transfer "stalled" at the end.
- if bytes_transferred > 10 * 1024 * 1024:
- self.logger.info('Uncompressing downloaded tmp file to %s...',
- file_name)
-
- # Downloaded gzipped file to a filename w/o .gz extension, so unzip.
- f_in = gzip.open(download_file_name, 'rb')
- with open(file_name, 'wb') as f_out:
- data = f_in.read(self.GUNZIP_CHUNK_SIZE)
- while data:
- f_out.write(data)
- if computed_hashes:
- # Compute digests again on the uncompressed data.
- for alg in computed_hashes.itervalues():
- alg.update(data)
- data = f_in.read(self.GUNZIP_CHUNK_SIZE)
- f_in.close()
-
- os.unlink(download_file_name)
-
- if not digest_verified:
- computed_hashes = dict((alg, digester.digest())
- for alg, digester in computed_hashes.iteritems())
- try:
- self._CheckHashes(
- src_key, file_name, hash_algs, computed_hashes=computed_hashes)
- except CommandException, e:
- os.unlink(file_name)
- raise
-
- return (end_time - start_time, bytes_transferred, dst_uri)
-
- def _PerformDownloadToStream(self, src_key, src_uri, str_fp, headers):
- (cb, num_cb, res_download_handler) = self._GetTransferHandlers(
- src_uri, src_key.size, False)
- start_time = time.time()
- src_key.get_contents_to_file(str_fp, headers, cb=cb, num_cb=num_cb)
- end_time = time.time()
- bytes_transferred = src_key.size
- end_time = time.time()
- return (end_time - start_time, bytes_transferred)
-
- def _CopyFileToFile(self, src_key, src_uri, dst_uri, headers):
- """Copies a local file to a local file.
-
- Args:
- src_key: Source StorageUri. Must be a file URI.
- src_uri: Source StorageUri.
- dst_uri: Destination StorageUri.
- headers: The headers dictionary.
- Returns:
- (elapsed_time, bytes_transferred, dst_uri), excluding
- overhead like initial HEAD.
-
- Raises:
- CommandException: if errors encountered.
- """
- self._LogCopyOperation(src_uri, dst_uri, headers)
- dst_key = dst_uri.new_key(False, headers)
- start_time = time.time()
- dst_key.set_contents_from_file(src_key.fp, headers)
- end_time = time.time()
- return (end_time - start_time, os.path.getsize(src_key.fp.name), dst_uri)
-
- def _CopyObjToObjDaisyChainMode(self, src_key, src_uri, dst_uri, headers):
- """Copies from src_uri to dst_uri in "daisy chain" mode.
- See -D OPTION documentation about what daisy chain mode is.
-
- Args:
- src_key: Source Key.
- src_uri: Source StorageUri.
- dst_uri: Destination StorageUri.
- headers: A copy of the top-level headers dictionary.
-
- Returns:
- (elapsed_time, bytes_transferred, version-specific dst_uri) excluding
- overhead like initial HEAD.
-
- Raises:
- CommandException: if errors encountered.
- """
- # Start with copy of input headers, so we'll include any headers that need
- # to be set from higher up in call stack (like x-goog-if-generation-match).
- headers = headers.copy()
- # Now merge headers from src_key so we'll preserve metadata.
- # Unfortunately boto separates headers into ones it puts in the metadata
- # dict and ones it pulls out into specific key fields, so we need to walk
- # through the latter list to find the headers that we copy over to the dest
- # object.
- for header_name, field_name in (
- ('cache-control', 'cache_control'),
- ('content-type', 'content_type'),
- ('content-language', 'content_language'),
- ('content-encoding', 'content_encoding'),
- ('content-disposition', 'content_disposition')):
- value = getattr(src_key, field_name, None)
- if value:
- headers[header_name] = value
- # Boto represents x-goog-meta-* headers in metadata dict with the
- # x-goog-meta- or x-amx-meta- prefix stripped. Turn these back into headers
- # for the destination object.
- for name, value in src_key.metadata.items():
- header_name = '%smeta-%s' % (dst_uri.get_provider().header_prefix, name)
- headers[header_name] = value
- # Set content type if specified in '-h Content-Type' option.
- self._SetContentTypeHeader(src_uri, headers)
- self._LogCopyOperation(src_uri, dst_uri, headers)
- (preserve_acl, canned_acl, headers) = (
- self._ProcessCopyObjectToObjectOptions(dst_uri, headers))
- if preserve_acl:
- if src_uri.get_provider() != dst_uri.get_provider():
- # We don't attempt to preserve ACLs across providers because
- # GCS and S3 support different ACLs and disjoint principals.
- raise NotImplementedError('Cross-provider cp -p not supported')
- # We need to read and write the ACL manually because the
- # Key.set_contents_from_file() API doesn't provide a preserve_acl
- # parameter (unlike the Bucket.copy_key() API used
- # by_CopyObjToObjInTheCloud).
- acl = src_uri.get_acl(headers=headers)
- fp = KeyFile(src_key)
- result = self._PerformResumableUploadIfApplies(fp, src_uri,
- dst_uri, canned_acl, headers,
- self._GetFileSize(fp))
- if preserve_acl:
- # If user specified noclobber flag, we need to remove the
- # x-goog-if-generation-match:0 header that was set when uploading the
- # object, because that precondition would fail when updating the ACL on
- # the now-existing object.
- if self.no_clobber:
- del headers['x-goog-if-generation-match']
- # Remove the owner field from the ACL in case we're copying from an object
- # that is owned by a different user. If we left that other user in the
- # ACL, attempting to set the ACL would result in a 400 (Bad Request).
- if hasattr(acl, 'owner'):
- del acl.owner
- dst_uri.set_acl(acl, dst_uri.object_name, headers=headers)
- return result
-
- def _PerformCopy(self, src_uri, dst_uri, allow_splitting=True):
- """Performs copy from src_uri to dst_uri, handling various special cases.
-
- Args:
- src_uri: Source StorageUri.
- dst_uri: Destination StorageUri.
- allow_splitting: Whether to allow the file to be split into component
- pieces for an parallel composite upload.
-
- Returns:
- (elapsed_time, bytes_transferred, version-specific dst_uri) excluding
- overhead like initial HEAD.
-
- Raises:
- CommandException: if errors encountered.
- """
- # Make a copy of the input headers each time so we can set a different
- # content type for each object.
- headers = self.headers.copy() if self.headers else {}
- download_headers = headers.copy()
- # Add accept encoding for download operation.
- AddAcceptEncoding(download_headers)
-
- src_key = src_uri.get_key(False, download_headers)
- if not src_key:
- raise CommandException('"%s" does not exist.' % src_uri)
-
- if self.use_manifest:
- # Set the source size in the manifest.
- self.manifest.Set(src_uri, 'size', getattr(src_key, 'size', None))
-
- # On Windows, stdin is opened as text mode instead of binary which causes
- # problems when piping a binary file, so this switches it to binary mode.
- if IS_WINDOWS and src_uri.is_file_uri() and src_key.is_stream():
- import msvcrt
- msvcrt.setmode(src_key.fp.fileno(), os.O_BINARY)
-
- if self.no_clobber:
- # There are two checks to prevent clobbering:
- # 1) The first check is to see if the item
- # already exists at the destination and prevent the upload/download
- # from happening. This is done by the exists() call.
- # 2) The second check is only relevant if we are writing to gs. We can
- # enforce that the server only writes the object if it doesn't exist
- # by specifying the header below. This check only happens at the
- # server after the complete file has been uploaded. We specify this
- # header to prevent a race condition where a destination file may
- # be created after the first check and before the file is fully
- # uploaded.
- # In order to save on unnecessary uploads/downloads we perform both
- # checks. However, this may come at the cost of additional HTTP calls.
- if dst_uri.exists(download_headers):
- if dst_uri.is_file_uri():
- # The local file may be a partial. Check the file sizes.
- if src_key.size == dst_uri.get_key(headers=download_headers).size:
- raise ItemExistsError()
- else:
- raise ItemExistsError()
- if dst_uri.is_cloud_uri() and dst_uri.scheme == 'gs':
- headers['x-goog-if-generation-match'] = '0'
-
- if src_uri.is_cloud_uri() and dst_uri.is_cloud_uri():
- if src_uri.scheme == dst_uri.scheme and not self.daisy_chain:
- return self._CopyObjToObjInTheCloud(src_key, src_uri, dst_uri, headers)
- else:
- return self._CopyObjToObjDaisyChainMode(src_key, src_uri, dst_uri,
- headers)
- elif src_uri.is_file_uri() and dst_uri.is_cloud_uri():
- return self._UploadFileToObject(src_key, src_uri, dst_uri,
- download_headers)
- elif src_uri.is_cloud_uri() and dst_uri.is_file_uri():
- return self._DownloadObjectToFile(src_key, src_uri, dst_uri,
- download_headers)
- elif src_uri.is_file_uri() and dst_uri.is_file_uri():
- return self._CopyFileToFile(src_key, src_uri, dst_uri, download_headers)
- else:
- raise CommandException('Unexpected src/dest case')
-
- def _PartitionFile(self, fp, file_size, src_uri, headers, canned_acl, bucket,
- random_prefix, tracker_file, tracker_file_lock):
- """Partitions a file into FilePart objects to be uploaded and later composed
- into an object matching the original file. This entails splitting the
- file into parts, naming and forming a destination URI for each part,
- and also providing the PerformResumableUploadIfAppliesArgs object
- corresponding to each part.
-
- Args:
- fp: The file object to be partitioned.
- file_size: The size of fp, in bytes.
- src_uri: The source StorageUri fromed from the original command.
- headers: The headers which ultimately passed to boto.
- canned_acl: The user-provided canned_acl, if applicable.
- bucket: The name of the destination bucket, of the form gs://bucket
- random_prefix: The randomly-generated prefix used to prevent collisions
- among the temporary component names.
- tracker_file: The path to the parallel composite upload tracker file.
- tracker_file_lock: The lock protecting access to the tracker file.
-
- Returns:
- dst_args: The destination URIs for the temporary component objects.
- """
- parallel_composite_upload_component_size = HumanReadableToBytes(
- boto.config.get('GSUtil', 'parallel_composite_upload_component_size',
- DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE))
- (num_components, component_size) = _GetPartitionInfo(file_size,
- MAX_COMPOSE_ARITY, parallel_composite_upload_component_size)
-
- # Make sure that the temporary objects don't already exist.
- tmp_object_headers = copy.deepcopy(headers)
- tmp_object_headers['x-goog-if-generation-match'] = '0'
-
- uri_strs = [] # Used to create a NameExpansionIterator.
- dst_args = {} # Arguments to create commands and pass to subprocesses.
- file_names = [] # Used for the 2-step process of forming dst_args.
- for i in range(num_components):
- # "Salt" the object name with something a user is very unlikely to have
- # used in an object name, then hash the extended name to make sure
- # we don't run into problems with name length. Using a deterministic
- # naming scheme for the temporary components allows users to take
- # advantage of resumable uploads for each component.
- encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode('utf-8')
- content_md5 = md5()
- content_md5.update(encoded_name)
- digest = content_md5.hexdigest()
- temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE +
- digest + '_' + str(i))
- tmp_dst_uri = MakeGsUri(bucket, temp_file_name, self.suri_builder)
-
- if i < (num_components - 1):
- # Every component except possibly the last is the same size.
- file_part_length = component_size
- else:
- # The last component just gets all of the remaining bytes.
- file_part_length = (file_size - ((num_components -1) * component_size))
- offset = i * component_size
- func_args = PerformResumableUploadIfAppliesArgs(
- fp.name, offset, file_part_length, src_uri, tmp_dst_uri, canned_acl,
- headers, tracker_file, tracker_file_lock)
- file_names.append(temp_file_name)
- dst_args[temp_file_name] = func_args
- uri_strs.append(self._MakeGsUriStr(bucket, temp_file_name))
-
- return dst_args
-
- def _MakeFileUri(self, filename):
- """Returns a StorageUri for a local file."""
- return self.suri_builder.StorageUri(filename)
-
- def _MakeGsUriStr(self, bucket, filename):
- """Returns a string of the form gs://bucket/filename, used to indicate an
- object in Google Cloud Storage.
- """
- return 'gs://' + bucket + '/' + filename
-
- def _DoParallelCompositeUpload(self, fp, src_uri, dst_uri, headers,
- canned_acl, file_size):
- """Uploads a local file to an object in the cloud for the Parallel Composite
- Uploads feature. The file is partitioned into parts, and then the parts
- are uploaded in parallel, composed to form the original destination
- object, and deleted.
-
- Args:
- fp: The file object to be uploaded.
- src_uri: The StorageURI of the local file.
- dst_uri: The StorageURI of the destination file.
- headers: The headers to pass to boto, if any.
- canned_acl: The canned acl to apply to the object, if any.
- file_size: The size of the source file in bytes.
- """
- start_time = time.time()
- gs_prefix = 'gs://'
- bucket = gs_prefix + dst_uri.bucket_name
- if 'content-type' in headers and not headers['content-type']:
- del headers['content-type']
-
- # Determine which components, if any, have already been successfully
- # uploaded.
- tracker_file = self._GetTrackerFilePath(dst_uri,
- TrackerFileType.PARALLEL_UPLOAD,
- src_uri)
- tracker_file_lock = CreateLock()
- (random_prefix, existing_components) = (
- _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock))
-
- # Create the initial tracker file for the upload.
- _CreateParallelUploadTrackerFile(tracker_file, random_prefix,
- existing_components, tracker_file_lock)
-
- # Get the set of all components that should be uploaded.
- dst_args = self._PartitionFile(fp, file_size, src_uri, headers, canned_acl,
- bucket, random_prefix, tracker_file,
- tracker_file_lock)
-
- (components_to_upload, existing_components, existing_objects_to_delete) = (
- FilterExistingComponents(dst_args, existing_components, bucket,
- self.suri_builder))
-
- # In parallel, copy all of the file parts that haven't already been
- # uploaded to temporary objects.
- cp_results = self.Apply(_PerformResumableUploadIfAppliesWrapper,
- components_to_upload,
- _CopyExceptionHandler,
- ('copy_failure_count', 'total_bytes_transferred'),
- arg_checker=gslib.command.DummyArgChecker,
- parallel_operations_override=True,
- should_return_results=True)
- uploaded_components = []
- total_bytes_uploaded = 0
- for cp_result in cp_results:
- total_bytes_uploaded += cp_result[1]
- uploaded_components.append(cp_result[2])
- components = uploaded_components + existing_components
-
- if len(components) == len(dst_args):
- # Only try to compose if all of the components were uploaded successfully.
-
- # Sort the components so that they will be composed in the correct order.
- components = sorted(
- components, key=lambda component:
- int(component.object_name[component.object_name.rfind('_')+1:]))
- result_uri = dst_uri.compose(components, headers=headers)
-
- try:
- # Make sure only to delete things that we know were successfully
- # uploaded (as opposed to all of the objects that we attempted to
- # create) so that we don't delete any preexisting objects, except for
- # those that were uploaded by a previous, failed run and have since
- # changed (but still have an old generation lying around).
- objects_to_delete = components + existing_objects_to_delete
- self.Apply(_DeleteKeyFn, objects_to_delete, _RmExceptionHandler,
- arg_checker=gslib.command.DummyArgChecker,
- parallel_operations_override=True)
- except Exception, e:
- if (e.message and ('unexpected failure in' in e.message)
- and ('sub-processes, aborting' in e.message)):
- # If some of the delete calls fail, don't cause the whole command to
- # fail. The copy was successful iff the compose call succeeded, so
- # just raise whatever exception (if any) happened before this instead,
- # and reduce this to a warning.
- logging.warning(
- 'Failed to delete some of the following temporary objects:\n' +
- '\n'.join(dst_args.keys()))
- else:
- raise e
- finally:
- with tracker_file_lock:
- if os.path.exists(tracker_file):
- os.unlink(tracker_file)
- else:
- # Some of the components failed to upload. In this case, we want to exit
- # without deleting the objects.
- raise CommandException(
- 'Some temporary components were not uploaded successfully. '
- 'Please retry this upload.')
-
- return (time.time() - start_time, total_bytes_uploaded, result_uri)
-
- def _ShouldDoParallelCompositeUpload(self, allow_splitting, src_key, dst_uri,
- file_size):
- """Returns True iff a parallel upload should be performed on the source key.
-
- Args:
- allow_splitting: If false, then this function returns false.
- src_key: Corresponding to a local file.
- dst_uri: Corresponding to an object in the cloud.
- file_size: The size of the source file, in bytes.
- """
- parallel_composite_upload_threshold = HumanReadableToBytes(boto.config.get(
- 'GSUtil', 'parallel_composite_upload_threshold',
- DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD))
- return (allow_splitting # Don't split the pieces multiple times.
- and not src_key.is_stream() # We can't partition streams.
- and dst_uri.scheme == 'gs' # Compose is only for gs.
- and parallel_composite_upload_threshold > 0
- and file_size >= parallel_composite_upload_threshold
- and file_size >= MIN_PARALLEL_COMPOSITE_FILE_SIZE)
-
- def _ExpandDstUri(self, dst_uri_str):
- """
- Expands wildcard if present in dst_uri_str.
-
- Args:
- dst_uri_str: String representation of requested dst_uri.
-
- Returns:
- (exp_dst_uri, have_existing_dst_container)
- where have_existing_dst_container is a bool indicating whether
- exp_dst_uri names an existing directory, bucket, or bucket subdirectory.
-
- Raises:
- CommandException: if dst_uri_str matched more than 1 URI.
- """
- dst_uri = self.suri_builder.StorageUri(dst_uri_str)
-
- # Handle wildcarded dst_uri case.
- if ContainsWildcard(dst_uri):
- blr_expansion = list(self.WildcardIterator(dst_uri))
- if len(blr_expansion) != 1:
- raise CommandException('Destination (%s) must match exactly 1 URI' %
- dst_uri_str)
- blr = blr_expansion[0]
- uri = blr.GetUri()
- if uri.is_cloud_uri():
- return (uri, uri.names_bucket() or blr.HasPrefix()
- or blr.GetKey().name.endswith('/'))
- else:
- return (uri, uri.names_directory())
-
- # Handle non-wildcarded dst_uri:
- if dst_uri.is_file_uri():
- return (dst_uri, dst_uri.names_directory())
- if dst_uri.names_bucket():
- return (dst_uri, True)
- # For object URIs check 3 cases: (a) if the name ends with '/' treat as a
- # subdir; else, perform a wildcard expansion with dst_uri + "*" and then
- # find if (b) there's a Prefix matching dst_uri, or (c) name is of form
- # dir_$folder$ (and in both these cases also treat dir as a subdir).
- if dst_uri.is_cloud_uri() and dst_uri_str.endswith('/'):
- return (dst_uri, True)
- blr_expansion = list(self.WildcardIterator(
- '%s*' % dst_uri_str.rstrip(dst_uri.delim)))
- for blr in blr_expansion:
- if blr.GetRStrippedUriString().endswith('_$folder$'):
- return (dst_uri, True)
- if blr.GetRStrippedUriString() == dst_uri_str.rstrip(dst_uri.delim):
- return (dst_uri, blr.HasPrefix())
- return (dst_uri, False)
-
- def _ConstructDstUri(self, src_uri, exp_src_uri,
- src_uri_names_container, src_uri_expands_to_multi,
- have_multiple_srcs, exp_dst_uri,
- have_existing_dest_subdir):
- """
- Constructs the destination URI for a given exp_src_uri/exp_dst_uri pair,
- using context-dependent naming rules that mimic Linux cp and mv behavior.
-
- Args:
- src_uri: src_uri to be copied.
- exp_src_uri: Single StorageUri from wildcard expansion of src_uri.
- src_uri_names_container: True if src_uri names a container (including the
- case of a wildcard-named bucket subdir (like gs://bucket/abc,
- where gs://bucket/abc/* matched some objects). Note that this is
- additional semantics tha src_uri.names_container() doesn't understand
- because the latter only understands StorageUris, not wildcards.
- src_uri_expands_to_multi: True if src_uri expanded to multiple URIs.
- have_multiple_srcs: True if this is a multi-source request. This can be
- true if src_uri wildcard-expanded to multiple URIs or if there were
- multiple source URIs in the request.
- exp_dst_uri: the expanded StorageUri requested for the cp destination.
- Final written path is constructed from this plus a context-dependent
- variant of src_uri.
- have_existing_dest_subdir: bool indicator whether dest is an existing
- subdirectory.
-
- Returns:
- StorageUri to use for copy.
-
- Raises:
- CommandException if destination object name not specified for
- source and source is a stream.
- """
- if self._ShouldTreatDstUriAsSingleton(
- have_multiple_srcs, have_existing_dest_subdir, exp_dst_uri):
- # We're copying one file or object to one file or object.
- return exp_dst_uri
-
- if exp_src_uri.is_stream():
- if exp_dst_uri.names_container():
- raise CommandException('Destination object name needed when '
- 'source is a stream')
- return exp_dst_uri
-
- if not self.recursion_requested and not have_multiple_srcs:
- # We're copying one file or object to a subdirectory. Append final comp
- # of exp_src_uri to exp_dst_uri.
- src_final_comp = exp_src_uri.object_name.rpartition(src_uri.delim)[-1]
- return self.suri_builder.StorageUri('%s%s%s' % (
- exp_dst_uri.uri.rstrip(exp_dst_uri.delim), exp_dst_uri.delim,
- src_final_comp))
-
- # Else we're copying multiple sources to a directory, bucket, or a bucket
- # "sub-directory".
-
- # Ensure exp_dst_uri ends in delim char if we're doing a multi-src copy or
- # a copy to a directory. (The check for copying to a directory needs
- # special-case handling so that the command:
- # gsutil cp gs://bucket/obj dir
- # will turn into file://dir/ instead of file://dir -- the latter would cause
- # the file "dirobj" to be created.)
- # Note: need to check have_multiple_srcs or src_uri.names_container()
- # because src_uri could be a bucket containing a single object, named
- # as gs://bucket.
- if ((have_multiple_srcs or src_uri.names_container()
- or os.path.isdir(exp_dst_uri.object_name))
- and not exp_dst_uri.uri.endswith(exp_dst_uri.delim)):
- exp_dst_uri = exp_dst_uri.clone_replace_name(
- '%s%s' % (exp_dst_uri.object_name, exp_dst_uri.delim)
- )
-
- # Making naming behavior match how things work with local Linux cp and mv
- # operations depends on many factors, including whether the destination is a
- # container, the plurality of the source(s), and whether the mv command is
- # being used:
- # 1. For the "mv" command that specifies a non-existent destination subdir,
- # renaming should occur at the level of the src subdir, vs appending that
- # subdir beneath the dst subdir like is done for copying. For example:
- # gsutil rm -R gs://bucket
- # gsutil cp -R dir1 gs://bucket
- # gsutil cp -R dir2 gs://bucket/subdir1
- # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2
- # would (if using cp naming behavior) end up with paths like:
- # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops
- # whereas mv naming behavior should result in:
- # gs://bucket/subdir2/dir2/.svn/all-wcprops
- # 2. Copying from directories, buckets, or bucket subdirs should result in
- # objects/files mirroring the source directory hierarchy. For example:
- # gsutil cp dir1/dir2 gs://bucket
- # should create the object gs://bucket/dir2/file2, assuming dir1/dir2
- # contains file2).
- # To be consistent with Linux cp behavior, there's one more wrinkle when
- # working with subdirs: The resulting object names depend on whether the
- # destination subdirectory exists. For example, if gs://bucket/subdir
- # exists, the command:
- # gsutil cp -R dir1/dir2 gs://bucket/subdir
- # should create objects named like gs://bucket/subdir/dir2/a/b/c. In
- # contrast, if gs://bucket/subdir does not exist, this same command
- # should create objects named like gs://bucket/subdir/a/b/c.
- # 3. Copying individual files or objects to dirs, buckets or bucket subdirs
- # should result in objects/files named by the final source file name
- # component. Example:
- # gsutil cp dir1/*.txt gs://bucket
- # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt,
- # assuming dir1 contains f1.txt and f2.txt.
-
- if (self.perform_mv and self.recursion_requested
- and src_uri_expands_to_multi and not have_existing_dest_subdir):
- # Case 1. Handle naming rules for bucket subdir mv. Here we want to
- # line up the src_uri against its expansion, to find the base to build
- # the new name. For example, running the command:
- # gsutil mv gs://bucket/abcd gs://bucket/xyz
- # when processing exp_src_uri=gs://bucket/abcd/123
- # exp_src_uri_tail should become /123
- # Note: mv.py code disallows wildcard specification of source URI.
- exp_src_uri_tail = exp_src_uri.uri[len(src_uri.uri):]
- dst_key_name = '%s/%s' % (exp_dst_uri.object_name.rstrip('/'),
- exp_src_uri_tail.strip('/'))
- return exp_dst_uri.clone_replace_name(dst_key_name)
-
- if src_uri_names_container and not exp_dst_uri.names_file():
- # Case 2. Build dst_key_name from subpath of exp_src_uri past
- # where src_uri ends. For example, for src_uri=gs://bucket/ and
- # exp_src_uri=gs://bucket/src_subdir/obj, dst_key_name should be
- # src_subdir/obj.
- src_uri_path_sans_final_dir = _GetPathBeforeFinalDir(src_uri)
- if exp_src_uri.is_cloud_uri():
- dst_key_name = exp_src_uri.versionless_uri[
- len(src_uri_path_sans_final_dir):].lstrip(src_uri.delim)
- else:
- dst_key_name = exp_src_uri.uri[
- len(src_uri_path_sans_final_dir):].lstrip(src_uri.delim)
- # Handle case where dst_uri is a non-existent subdir.
- if not have_existing_dest_subdir:
- dst_key_name = dst_key_name.partition(src_uri.delim)[-1]
- # Handle special case where src_uri was a directory named with '.' or
- # './', so that running a command like:
- # gsutil cp -r . gs://dest
- # will produce obj names of the form gs://dest/abc instead of
- # gs://dest/./abc.
- if dst_key_name.startswith('.%s' % os.sep):
- dst_key_name = dst_key_name[2:]
-
- else:
- # Case 3.
- dst_key_name = exp_src_uri.object_name.rpartition(src_uri.delim)[-1]
-
- if (exp_dst_uri.is_file_uri()
- or self._ShouldTreatDstUriAsBucketSubDir(
- have_multiple_srcs, exp_dst_uri, have_existing_dest_subdir)):
- if exp_dst_uri.object_name.endswith(exp_dst_uri.delim):
- dst_key_name = '%s%s%s' % (
- exp_dst_uri.object_name.rstrip(exp_dst_uri.delim),
- exp_dst_uri.delim, dst_key_name)
- else:
- delim = exp_dst_uri.delim if exp_dst_uri.object_name else ''
- dst_key_name = '%s%s%s' % (exp_dst_uri.object_name, delim, dst_key_name)
-
- return exp_dst_uri.clone_replace_name(dst_key_name)
-
- def _FixWindowsNaming(self, src_uri, dst_uri):
- """
- Rewrites the destination URI built by _ConstructDstUri() to translate
- Windows pathnames to cloud pathnames if needed.
-
- Args:
- src_uri: Source URI to be copied.
- dst_uri: The destination URI built by _ConstructDstUri().
-
- Returns:
- StorageUri to use for copy.
- """
- if (src_uri.is_file_uri() and src_uri.delim == '\\'
- and dst_uri.is_cloud_uri()):
- trans_uri_str = re.sub(r'\\', '/', dst_uri.uri)
- dst_uri = self.suri_builder.StorageUri(trans_uri_str)
- return dst_uri
-
- def _CopyFunc(self, name_expansion_result):
+ # pylint: disable=too-many-statements
+ def CopyFunc(self, name_expansion_result, thread_state=None):
"""Worker function for performing the actual copy (and rm, for mv)."""
- (exp_dst_uri, have_existing_dst_container) = self._ExpandDstUri(
- self.args[-1])
- if self.perform_mv:
+ gsutil_api = GetCloudApiInstance(self, thread_state=thread_state)
+
+ copy_helper_opts = copy_helper.GetCopyHelperOpts()
+ if copy_helper_opts.perform_mv:
cmd_name = 'mv'
else:
cmd_name = self.command_name
- src_uri = self.suri_builder.StorageUri(
- name_expansion_result.GetSrcUriStr())
- exp_src_uri = self.suri_builder.StorageUri(
- name_expansion_result.GetExpandedUriStr())
- src_uri_names_container = name_expansion_result.NamesContainer()
- src_uri_expands_to_multi = name_expansion_result.NamesContainer()
- have_multiple_srcs = name_expansion_result.IsMultiSrcRequest()
- have_existing_dest_subdir = (
- name_expansion_result.HaveExistingDstContainer())
- if src_uri.names_provider():
+ src_url = name_expansion_result.source_storage_url
+ exp_src_url = name_expansion_result.expanded_storage_url
+ src_url_names_container = name_expansion_result.names_container
+ have_multiple_srcs = name_expansion_result.is_multi_source_request
+
+ if src_url.IsCloudUrl() and src_url.IsProvider():
raise CommandException(
- 'The %s command does not allow provider-only source URIs (%s)' %
- (cmd_name, src_uri))
+ 'The %s command does not allow provider-only source URLs (%s)' %
+ (cmd_name, src_url))
if have_multiple_srcs:
- self._InsistDstUriNamesContainer(exp_dst_uri,
- have_existing_dst_container,
- cmd_name)
+ copy_helper.InsistDstUrlNamesContainer(
+ self.exp_dst_url, self.have_existing_dst_container, cmd_name)
+ # Various GUI tools (like the GCS web console) create placeholder objects
+ # ending with '/' when the user creates an empty directory. Normally these
+ # tools should delete those placeholders once objects have been written
+ # "under" the directory, but sometimes the placeholders are left around. We
+ # need to filter them out here, otherwise if the user tries to rsync from
+ # GCS to a local directory it will result in a directory/file conflict
+ # (e.g., trying to download an object called "mydata/" where the local
+ # directory "mydata" exists).
+ if IsCloudSubdirPlaceholder(exp_src_url):
+ self.logger.info('Skipping cloud sub-directory placeholder object %s',
+ exp_src_url)
+ return
- if self.use_manifest and self.manifest.WasSuccessful(str(exp_src_uri)):
+ if copy_helper_opts.use_manifest and self.manifest.WasSuccessful(
+ exp_src_url.url_string):
return
- if self.perform_mv:
- if name_expansion_result.NamesContainer():
+ if copy_helper_opts.perform_mv:
+ if name_expansion_result.names_container:
# Use recursion_requested when performing name expansion for the
- # directory mv case so we can determine if any of the source URIs are
+ # directory mv case so we can determine if any of the source URLs are
# directories (and then use cp -R and rm -R to perform the move, to
# match the behavior of Linux mv (which when moving a directory moves
# all the contained files).
self.recursion_requested = True
- # Disallow wildcard src URIs when moving directories, as supporting it
+ # Disallow wildcard src URLs when moving directories, as supporting it
# would make the name transformation too complex and would also be
# dangerous (e.g., someone could accidentally move many objects to the
# wrong name, or accidentally overwrite many objects).
- if ContainsWildcard(src_uri):
+ if ContainsWildcard(src_url.url_string):
raise CommandException('The mv command disallows naming source '
'directories using wildcards')
- if (exp_dst_uri.is_file_uri()
- and not os.path.exists(exp_dst_uri.object_name)
+ if (self.exp_dst_url.IsFileUrl()
+ and not os.path.exists(self.exp_dst_url.object_name)
and have_multiple_srcs):
- os.makedirs(exp_dst_uri.object_name)
+ os.makedirs(self.exp_dst_url.object_name)
- dst_uri = self._ConstructDstUri(src_uri, exp_src_uri,
- src_uri_names_container,
- src_uri_expands_to_multi,
- have_multiple_srcs, exp_dst_uri,
- have_existing_dest_subdir)
- dst_uri = self._FixWindowsNaming(src_uri, dst_uri)
+ dst_url = copy_helper.ConstructDstUrl(
+ src_url, exp_src_url, src_url_names_container, have_multiple_srcs,
+ self.exp_dst_url, self.have_existing_dst_container,
+ self.recursion_requested)
+ dst_url = copy_helper.FixWindowsNaming(src_url, dst_url)
- self._CheckForDirFileConflict(exp_src_uri, dst_uri)
- if self._SrcDstSame(exp_src_uri, dst_uri):
+ copy_helper.CheckForDirFileConflict(exp_src_url, dst_url)
+ if copy_helper.SrcDstSame(exp_src_url, dst_url):
raise CommandException('%s: "%s" and "%s" are the same file - '
- 'abort.' % (cmd_name, exp_src_uri, dst_uri))
+ 'abort.' % (cmd_name, exp_src_url, dst_url))
- if dst_uri.is_cloud_uri() and dst_uri.is_version_specific:
- raise CommandException('%s: a version-specific URI\n(%s)\ncannot be '
+ if dst_url.IsCloudUrl() and dst_url.HasGeneration():
+ raise CommandException('%s: a version-specific URL\n(%s)\ncannot be '
'the destination for gsutil cp - abort.'
- % (cmd_name, dst_uri))
+ % (cmd_name, dst_url))
elapsed_time = bytes_transferred = 0
try:
- if self.use_manifest:
- self.manifest.Initialize(exp_src_uri, dst_uri)
- (elapsed_time, bytes_transferred, result_uri) = (
- self._PerformCopy(exp_src_uri, dst_uri))
- if self.use_manifest:
- if hasattr(dst_uri, 'md5'):
- self.manifest.Set(exp_src_uri, 'md5', dst_uri.md5)
- self.manifest.SetResult(exp_src_uri, bytes_transferred, 'OK')
+ if copy_helper_opts.use_manifest:
+ self.manifest.Initialize(
+ exp_src_url.url_string, dst_url.url_string)
+ (elapsed_time, bytes_transferred, result_url, md5) = (
+ copy_helper.PerformCopy(
+ self.logger, exp_src_url, dst_url, gsutil_api,
+ self, _CopyExceptionHandler, allow_splitting=True,
+ headers=self.headers, manifest=self.manifest,
+ gzip_exts=self.gzip_exts, test_method=self.test_method))
+ if copy_helper_opts.use_manifest:
+ if md5:
+ self.manifest.Set(exp_src_url.url_string, 'md5', md5)
+ self.manifest.SetResult(
+ exp_src_url.url_string, bytes_transferred, 'OK')
+ if copy_helper_opts.print_ver:
+ # Some cases don't return a version-specific URL (e.g., if destination
+ # is a file).
+ self.logger.info('Created: %s', result_url)
except ItemExistsError:
- message = 'Skipping existing item: %s' % dst_uri.uri
+ message = 'Skipping existing item: %s' % dst_url
self.logger.info(message)
- if self.use_manifest:
- self.manifest.SetResult(exp_src_uri, 0, 'skip', message)
+ if copy_helper_opts.use_manifest:
+ self.manifest.SetResult(exp_src_url.url_string, 0, 'skip', message)
except Exception, e:
- if self._IsNoClobberServerException(e):
- message = 'Rejected (noclobber): %s' % dst_uri.uri
+ if (copy_helper_opts.no_clobber and
+ copy_helper.IsNoClobberServerException(e)):
+ message = 'Rejected (noclobber): %s' % dst_url
self.logger.info(message)
- if self.use_manifest:
- self.manifest.SetResult(exp_src_uri, 0, 'skip', message)
+ if copy_helper_opts.use_manifest:
+ self.manifest.SetResult(
+ exp_src_url.url_string, 0, 'skip', message)
elif self.continue_on_error:
- message = 'Error copying %s: %s' % (src_uri.uri, str(e))
- self.copy_failure_count += 1
+ message = 'Error copying %s: %s' % (src_url, str(e))
+ self.op_failure_count += 1
self.logger.error(message)
- if self.use_manifest:
- self.manifest.SetResult(exp_src_uri, 0, 'error', message)
+ if copy_helper_opts.use_manifest:
+ self.manifest.SetResult(
+ exp_src_url.url_string, 0, 'error',
+ RemoveCRLFFromString(message))
else:
- if self.use_manifest:
- self.manifest.SetResult(exp_src_uri, 0, 'error', str(e))
+ if copy_helper_opts.use_manifest:
+ self.manifest.SetResult(
+ exp_src_url.url_string, 0, 'error', str(e))
raise
- if self.print_ver:
- # Some cases don't return a version-specific URI (e.g., if destination
- # is a file).
- if hasattr(result_uri, 'version_specific_uri'):
- self.logger.info('Created: %s' % result_uri.version_specific_uri)
- else:
- self.logger.info('Created: %s' % result_uri.uri)
-
# TODO: If we ever use -n (noclobber) with -M (move) (not possible today
# since we call copy internally from move and don't specify the -n flag)
# we'll need to only remove the source when we have not skipped the
# destination.
- if self.perform_mv:
- self.logger.info('Removing %s...', exp_src_uri)
- exp_src_uri.delete_key(validate=False, headers=self.headers)
+ if copy_helper_opts.perform_mv:
+ self.logger.info('Removing %s...', exp_src_url)
+ if exp_src_url.IsCloudUrl():
+ gsutil_api.DeleteObject(exp_src_url.bucket_name,
+ exp_src_url.object_name,
+ generation=exp_src_url.generation,
+ provider=exp_src_url.scheme)
+ else:
+ os.unlink(exp_src_url.object_name)
+
with self.stats_lock:
self.total_elapsed_time += elapsed_time
self.total_bytes_transferred += bytes_transferred
# Command entry point.
def RunCommand(self):
- self._ParseArgs()
+ copy_helper_opts = self._ParseOpts()
self.total_elapsed_time = self.total_bytes_transferred = 0
if self.args[-1] == '-' or self.args[-1] == 'file://-':
- self._HandleStreamingDownload()
- return 0
+ return CatHelper(self).CatUrlStrings(self.args[:-1])
- if self.read_args_from_stdin:
+ if copy_helper_opts.read_args_from_stdin:
if len(self.args) != 1:
- raise CommandException('Source URIs cannot be specified with -I option')
- uri_strs = self._StdinIterator()
+ raise CommandException('Source URLs cannot be specified with -I option')
+ url_strs = copy_helper.StdinIterator()
else:
if len(self.args) < 2:
raise CommandException('Wrong number of arguments for "cp" command.')
- uri_strs = self.args[:-1]
+ url_strs = self.args[:-1]
- (exp_dst_uri, have_existing_dst_container) = self._ExpandDstUri(
- self.args[-1])
+ (self.exp_dst_url, self.have_existing_dst_container) = (
+ copy_helper.ExpandUrlToSingleBlr(self.args[-1], self.gsutil_api,
+ self.debug, self.project_id))
+
# If the destination bucket has versioning enabled iterate with
# all_versions=True. That way we'll copy all versions if the source bucket
# is versioned; and by leaving all_versions=False if the destination bucket
# has versioning disabled we will avoid copying old versions all to the same
# un-versioned destination object.
+ all_versions = False
try:
- all_versions = (exp_dst_uri.names_bucket()
- and exp_dst_uri.get_versioning_config(self.headers))
- except GSResponseError as e:
- # This happens if the user doesn't have OWNER access on the bucket (needed
- # to check if versioning is enabled). In this case fall back to copying
- # all versions (which can be inefficient for the reason noted in the
- # comment above). We don't try to warn the user because that would result
- # in false positive warnings (since we can't check if versioning is
- # enabled on the destination bucket).
- if e.status == 403:
+ bucket = self._GetBucketWithVersioningConfig(self.exp_dst_url)
+ if bucket and bucket.versioning and bucket.versioning.enabled:
all_versions = True
- else:
- raise
+ except AccessDeniedException:
+ # This happens (in the XML API only) if the user doesn't have OWNER access
+ # on the bucket (needed to check if versioning is enabled). In this case
+ # fall back to copying all versions (which can be inefficient for the
+ # reason noted in the comment above). We don't try to warn the user
+ # because that would result in false positive warnings (since we can't
+ # check if versioning is enabled on the destination bucket).
+ #
+ # For JSON, we will silently not return versioning if we don't have
+ # access.
+ all_versions = True
+
name_expansion_iterator = NameExpansionIterator(
- self.command_name, self.proj_id_handler, self.headers, self.debug,
- self.logger, self.bucket_storage_uri_class, uri_strs,
- self.recursion_requested or self.perform_mv,
- have_existing_dst_container=have_existing_dst_container,
- all_versions=all_versions)
- self.have_existing_dst_container = have_existing_dst_container
+ self.command_name, self.debug,
+ self.logger, self.gsutil_api, url_strs,
+ self.recursion_requested or copy_helper_opts.perform_mv,
+ project_id=self.project_id, all_versions=all_versions,
+ continue_on_error=self.continue_on_error or self.parallel_operations)
# Use a lock to ensure accurate statistics in the face of
# multi-threading/multi-processing.
self.stats_lock = CreateLock()
# Tracks if any copies failed.
- self.copy_failure_count = 0
+ self.op_failure_count = 0
# Start the clock.
start_time = time.time()
# Tuple of attributes to share/manage across multiple processes in
# parallel (-m) mode.
- shared_attrs = ('copy_failure_count', 'total_bytes_transferred')
+ shared_attrs = ('op_failure_count', 'total_bytes_transferred')
# Perform copy requests in parallel (-m) mode, if requested, using
# configured number of parallel processes and threads. Otherwise,
# perform requests with sequential function calls in current process.
self.Apply(_CopyFuncWrapper, name_expansion_iterator,
- _CopyExceptionHandler, shared_attrs, fail_on_error=True)
+ _CopyExceptionHandler, shared_attrs,
+ fail_on_error=(not self.continue_on_error))
self.logger.debug(
'total_bytes_transferred: %d', self.total_bytes_transferred)
@@ -2292,621 +833,122 @@
if self.debug == 3:
# Note that this only counts the actual GET and PUT bytes for the copy
- # - not any transfers for doing wildcard expansion, the initial HEAD
- # request boto performs when doing a bucket.get_key() operation, etc.
+ # - not any transfers for doing wildcard expansion, the initial
+ # HEAD/GET request performed to get the object metadata, etc.
if self.total_bytes_transferred != 0:
self.logger.info(
'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)',
self.total_bytes_transferred, self.total_elapsed_time,
MakeHumanReadable(self.total_bytes_per_second))
- if self.copy_failure_count:
- plural_str = ''
- if self.copy_failure_count > 1:
- plural_str = 's'
+ if self.op_failure_count:
+ plural_str = 's' if self.op_failure_count else ''
raise CommandException('%d file%s/object%s could not be transferred.' % (
- self.copy_failure_count, plural_str, plural_str))
+ self.op_failure_count, plural_str, plural_str))
return 0
- def _ParseArgs(self):
- self.perform_mv = False
+ def _ParseOpts(self):
+ perform_mv = False
+ # exclude_symlinks is handled by Command parent class, so save in Command
+ # state rather than CopyHelperOpts.
self.exclude_symlinks = False
- self.no_clobber = False
+ no_clobber = False
+ # continue_on_error is handled by Command parent class, so save in Command
+ # state rather than CopyHelperOpts.
self.continue_on_error = False
- self.daisy_chain = False
- self.read_args_from_stdin = False
- self.print_ver = False
- self.use_manifest = False
+ daisy_chain = False
+ read_args_from_stdin = False
+ print_ver = False
+ use_manifest = False
+ preserve_acl = False
+ canned_acl = None
+ # canned_acl is handled by a helper function in parent
+ # Command class, so save in Command state rather than CopyHelperOpts.
+ self.canned = None
+ # Files matching these extensions should be gzipped before uploading.
+ self.gzip_exts = []
+
+ # Test hook for stopping transfers.
+ halt_at_byte = None
+
# self.recursion_requested initialized in command.py (so can be checked
# in parent class for all commands).
+ self.manifest = None
if self.sub_opts:
for o, a in self.sub_opts:
+ if o == '-a':
+ canned_acl = a
+ self.canned = True
if o == '-c':
self.continue_on_error = True
elif o == '-D':
- self.daisy_chain = True
+ daisy_chain = True
elif o == '-e':
self.exclude_symlinks = True
+ elif o == '--haltatbyte':
+ halt_at_byte = long(a)
elif o == '-I':
- self.read_args_from_stdin = True
+ read_args_from_stdin = True
elif o == '-L':
- self.use_manifest = True
- self.manifest = _Manifest(a)
+ use_manifest = True
+ self.manifest = Manifest(a)
elif o == '-M':
# Note that we signal to the cp command to perform a move (copy
# followed by remove) and use directory-move naming rules by passing
# the undocumented (for internal use) -M option when running the cp
# command from mv.py.
- self.perform_mv = True
+ perform_mv = True
elif o == '-n':
- self.no_clobber = True
- elif o == '-q':
- self.logger.warning(
- 'Warning: gsutil cp -q is deprecated, and will be removed in the '
- 'future.\nPlease use gsutil -q cp ... instead.')
- self.logger.setLevel(level=logging.WARNING)
+ no_clobber = True
+ elif o == '-p':
+ preserve_acl = True
elif o == '-r' or o == '-R':
self.recursion_requested = True
elif o == '-v':
- self.print_ver = True
+ print_ver = True
+ elif o == '-z':
+ self.gzip_exts = [x.strip() for x in a.split(',')]
+ if preserve_acl and canned_acl:
+ raise CommandException(
+ 'Specifying both the -p and -a options together is invalid.')
+ return CreateCopyHelperOpts(
+ perform_mv=perform_mv,
+ no_clobber=no_clobber,
+ daisy_chain=daisy_chain,
+ read_args_from_stdin=read_args_from_stdin,
+ print_ver=print_ver,
+ use_manifest=use_manifest,
+ preserve_acl=preserve_acl,
+ canned_acl=canned_acl,
+ halt_at_byte=halt_at_byte)
- def _HandleStreamingDownload(self):
- # Destination is <STDOUT>. Manipulate sys.stdout so as to redirect all
- # debug messages to <STDERR>.
- stdout_fp = sys.stdout
- sys.stdout = sys.stderr
- did_some_work = False
- for uri_str in self.args[0:len(self.args)-1]:
- for uri in self.WildcardIterator(uri_str).IterUris():
- did_some_work = True
- key = uri.get_key(False, self.headers)
- (elapsed_time, bytes_transferred) = self._PerformDownloadToStream(
- key, uri, stdout_fp, self.headers)
- self.total_elapsed_time += elapsed_time
- self.total_bytes_transferred += bytes_transferred
- if not did_some_work:
- raise CommandException('No URIs matched')
- if self.debug == 3:
- if self.total_bytes_transferred != 0:
- self.logger.info(
- 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)',
- self.total_bytes_transferred, self.total_elapsed_time,
- MakeHumanReadable(float(self.total_bytes_transferred) /
- float(self.total_elapsed_time)))
+ def _GetBucketWithVersioningConfig(self, exp_dst_url):
+ """Gets versioning config for a bucket and ensures that it exists.
- def _StdinIterator(self):
- """A generator function that returns lines from stdin."""
- for line in sys.stdin:
- # Strip CRLF.
- yield line.rstrip()
-
- def _SrcDstSame(self, src_uri, dst_uri):
- """Checks if src_uri and dst_uri represent the same object or file.
-
- We don't handle anything about hard or symbolic links.
-
Args:
- src_uri: Source StorageUri.
- dst_uri: Destination StorageUri.
+ exp_dst_url: Wildcard-expanded destination StorageUrl.
- Returns:
- Bool indicator.
- """
- if src_uri.is_file_uri() and dst_uri.is_file_uri():
- # Translate a/b/./c to a/b/c, so src=dst comparison below works.
- new_src_path = os.path.normpath(src_uri.object_name)
- new_dst_path = os.path.normpath(dst_uri.object_name)
- return (src_uri.clone_replace_name(new_src_path).uri ==
- dst_uri.clone_replace_name(new_dst_path).uri)
- else:
- return (src_uri.uri == dst_uri.uri and
- src_uri.generation == dst_uri.generation and
- src_uri.version_id == dst_uri.version_id)
+ Raises:
+ AccessDeniedException: if there was a permissions problem accessing the
+ bucket or its versioning config.
+ CommandException: if URL refers to a cloud bucket that does not exist.
- def _ShouldTreatDstUriAsBucketSubDir(self, have_multiple_srcs, dst_uri,
- have_existing_dest_subdir):
- """
- Checks whether dst_uri should be treated as a bucket "sub-directory". The
- decision about whether something constitutes a bucket "sub-directory"
- depends on whether there are multiple sources in this request and whether
- there is an existing bucket subdirectory. For example, when running the
- command:
- gsutil cp file gs://bucket/abc
- if there's no existing gs://bucket/abc bucket subdirectory we should copy
- file to the object gs://bucket/abc. In contrast, if
- there's an existing gs://bucket/abc bucket subdirectory we should copy
- file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc
- exists, when running the command:
- gsutil cp file1 file2 gs://bucket/abc
- we should copy file1 to gs://bucket/abc/file1 (and similarly for file2).
-
- Note that we don't disallow naming a bucket "sub-directory" where there's
- already an object at that URI. For example it's legitimate (albeit
- confusing) to have an object called gs://bucket/dir and
- then run the command
- gsutil cp file1 file2 gs://bucket/dir
- Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1,
- and gs://bucket/dir/file2.
-
- Args:
- have_multiple_srcs: Bool indicator of whether this is a multi-source
- operation.
- dst_uri: StorageUri to check.
- have_existing_dest_subdir: bool indicator whether dest is an existing
- subdirectory.
-
Returns:
- bool indicator.
+ apitools Bucket with versioning configuration.
"""
- return ((have_multiple_srcs and dst_uri.is_cloud_uri())
- or (have_existing_dest_subdir))
-
- def _ShouldTreatDstUriAsSingleton(self, have_multiple_srcs,
- have_existing_dest_subdir, dst_uri):
- """
- Checks that dst_uri names a singleton (file or object) after
- dir/wildcard expansion. The decision is more nuanced than simply
- dst_uri.names_singleton()) because of the possibility that an object path
- might name a bucket sub-directory.
-
- Args:
- have_multiple_srcs: Bool indicator of whether this is a multi-source
- operation.
- have_existing_dest_subdir: bool indicator whether dest is an existing
- subdirectory.
- dst_uri: StorageUri to check.
-
- Returns:
- bool indicator.
- """
- if have_multiple_srcs:
- # Only a file meets the criteria in this case.
- return dst_uri.names_file()
- return not have_existing_dest_subdir and dst_uri.names_singleton()
-
- def _IsNoClobberServerException(self, e):
- """
- Checks to see if the server attempted to clobber a file after we specified
- in the header that we didn't want the file clobbered.
-
- Args:
- e: The Exception that was generated by a failed copy operation
-
- Returns:
- bool indicator - True indicates that the server did attempt to clobber
- an existing file.
- """
- return self.no_clobber and (
- (isinstance(e, GSResponseError) and e.status==412) or
- (isinstance(e, ResumableUploadException) and 'code 412' in e.message))
-
-
-class _Manifest(object):
- """Stores the manifest items for the CpCommand class."""
-
- def __init__(self, path):
- # self.items contains a dictionary of rows
- self.items = {}
- self.manifest_filter = {}
- self.lock = multiprocessing.Manager().Lock()
-
- self.manifest_path = os.path.expanduser(path)
- self._ParseManifest()
- self._CreateManifestFile()
-
- def _ParseManifest(self):
- """
- Load and parse a manifest file. This information will be used to skip
- any files that have a skip or OK status.
- """
- try:
- if os.path.exists(self.manifest_path):
- with open(self.manifest_path, 'rb') as f:
- first_row = True
- reader = csv.reader(f)
- for row in reader:
- if first_row:
- try:
- source_index = row.index('Source')
- result_index = row.index('Result')
- except ValueError:
- # No header and thus not a valid manifest file.
- raise CommandException(
- 'Missing headers in manifest file: %s' % self.manifest_path)
- first_row = False
- source = row[source_index]
- result = row[result_index]
- if result in ['OK', 'skip']:
- # We're always guaranteed to take the last result of a specific
- # source uri.
- self.manifest_filter[source] = result
- except IOError as ex:
- raise CommandException('Could not parse %s' % path)
-
- def WasSuccessful(self, src):
- """ Returns whether the specified src uri was marked as successful."""
- return src in self.manifest_filter
-
- def _CreateManifestFile(self):
- """Opens the manifest file and assigns it to the file pointer."""
- try:
- if ((not os.path.exists(self.manifest_path))
- or (os.stat(self.manifest_path).st_size == 0)):
- # Add headers to the new file.
- with open(self.manifest_path, 'wb', 1) as f:
- writer = csv.writer(f)
- writer.writerow(['Source',
- 'Destination',
- 'Start',
- 'End',
- 'Md5',
- 'UploadId',
- 'Source Size',
- 'Bytes Transferred',
- 'Result',
- 'Description'])
- except IOError:
- raise CommandException('Could not create manifest file.')
-
- def Set(self, uri, key, value):
- if value is None:
- # In case we don't have any information to set we bail out here.
- # This is so that we don't clobber existing information.
- # To zero information pass '' instead of None.
- return
- if uri in self.items:
- self.items[uri][key] = value
- else:
- self.items[uri] = {key:value}
-
- def Initialize(self, source_uri, destination_uri):
- # Always use the source_uri as the key for the item. This is unique.
- self.Set(source_uri, 'source_uri', source_uri)
- self.Set(source_uri, 'destination_uri', destination_uri)
- self.Set(source_uri, 'start_time', datetime.datetime.utcnow())
-
- def SetResult(self, source_uri, bytes_transferred, result,
- description=''):
- self.Set(source_uri, 'bytes', bytes_transferred)
- self.Set(source_uri, 'result', result)
- self.Set(source_uri, 'description', description)
- self.Set(source_uri, 'end_time', datetime.datetime.utcnow())
- self._WriteRowToManifestFile(source_uri)
- self._RemoveItemFromManifest(source_uri)
-
- def _WriteRowToManifestFile(self, uri):
- row_item = self.items[uri]
- data = [
- str(row_item['source_uri']),
- str(row_item['destination_uri']),
- '%sZ' % row_item['start_time'].isoformat(),
- '%sZ' % row_item['end_time'].isoformat(),
- row_item['md5'] if 'md5' in row_item else '',
- row_item['upload_id'] if 'upload_id' in row_item else '',
- str(row_item['size']) if 'size' in row_item else '',
- str(row_item['bytes']) if 'bytes' in row_item else '',
- row_item['result'],
- row_item['description']]
-
- # Aquire a lock to prevent multiple threads writing to the same file at
- # the same time. This would cause a garbled mess in the manifest file.
- with self.lock:
- with open(self.manifest_path, 'a', 1) as f: # 1 == line buffered
- writer = csv.writer(f)
- writer.writerow(data)
-
- def _RemoveItemFromManifest(self, uri):
- # Remove the item from the dictionary since we're done with it and
- # we don't want the dictionary to grow too large in memory for no good
- # reason.
- del self.items[uri]
-
-
-class ItemExistsError(Exception):
- """Exception class for objects that are skipped because they already exist."""
- pass
-
-
-def _GetPathBeforeFinalDir(uri):
- """
- Returns the part of the path before the final directory component for the
- given URI, handling cases for file system directories, bucket, and bucket
- subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket',
- and for file://dir we'll return file://
-
- Args:
- uri: StorageUri.
-
- Returns:
- String name of above-described path, sans final path separator.
- """
- sep = uri.delim
- # If the source uri argument had a wildcard and wasn't expanded by the
- # shell, then uri.names_file() will always be true, so we check for
- # this case explicitly.
- assert ((not uri.names_file()) or ContainsWildcard(uri.object_name))
- if uri.names_directory():
- past_scheme = uri.uri[len('file://'):]
- if past_scheme.find(sep) == -1:
- return 'file://'
- else:
- return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0]
- if uri.names_bucket():
- return '%s://' % uri.scheme
- # Else it names a bucket subdir.
- return uri.uri.rstrip(sep).rpartition(sep)[0]
-
-def _HashFilename(filename):
- """
- Apply a hash function (SHA1) to shorten the passed file name. The spec
- for the hashed file name is as follows:
-
- TRACKER_<hash>_<trailing>
-
- where hash is a SHA1 hash on the original file name and trailing is
- the last 16 chars from the original file name. Max file name lengths
- vary by operating system so the goal of this function is to ensure
- the hashed version takes fewer than 100 characters.
-
- Args:
- filename: file name to be hashed.
-
- Returns:
- shorter, hashed version of passed file name
- """
- if isinstance(filename, unicode):
- filename = filename.encode('utf-8')
- else:
- filename = unicode(filename, 'utf8').encode('utf-8')
- m = hashlib.sha1(filename)
- return "TRACKER_" + m.hexdigest() + '.' + filename[-16:]
-
-def _DivideAndCeil(dividend, divisor):
- """Returns ceil(dividend / divisor), taking care to avoid the pitfalls of
- floating point arithmetic that could otherwise yield the wrong result
- for large numbers.
- """
- quotient = dividend // divisor
- if (dividend % divisor) != 0:
- quotient += 1
- return quotient
-
-def _GetPartitionInfo(file_size, max_components, default_component_size):
- """
- Args:
- file_size: The number of bytes in the file to be partitioned.
- max_components: The maximum number of components that can be composed.
- default_component_size: The size of a component, assuming that
- max_components is infinite.
- Returns:
- The number of components in the partitioned file, and the size of each
- component (except the last, which will have a different size iff
- file_size != 0 (mod num_components)).
- """
- # num_components = ceil(file_size / default_component_size)
- num_components = _DivideAndCeil(file_size, default_component_size)
-
- # num_components must be in the range [2, max_components]
- num_components = max(min(num_components, max_components), 2)
-
- # component_size = ceil(file_size / num_components)
- component_size = _DivideAndCeil(file_size, num_components)
- return (num_components, component_size)
-
-def _DeleteKeyFn(cls, key):
- """Wrapper function to be used with command.Apply()."""
- return key.delete_key()
-
-def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock):
- """Parse the tracker file (if any) from the last parallel composite upload
- attempt. The tracker file is of the format described in
- _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be
- read, then the upload will start from the beginning.
-
- Args:
- tracker_file: The name of the file to parse.
- tracker_file_lock: Lock protecting access to the tracker file.
-
- Returns:
- random_prefix: A randomly-generated prefix to the name of the
- temporary components.
- existing_objects: A list of ObjectFromTracker objects representing
- the set of files that have already been uploaded.
- """
- existing_objects = []
- try:
- with tracker_file_lock:
- f = open(tracker_file, 'r')
- lines = f.readlines()
- lines = [line.strip() for line in lines]
- f.close()
- except IOError as e:
- # We can't read the tracker file, so generate a new random prefix.
- lines = [str(random.randint(1, (10 ** 10) - 1))]
-
- # Ignore non-existent file (happens first time an upload
- # is attempted on a file), but warn user for other errors.
- if e.errno != errno.ENOENT:
- # Will restart because we failed to read in the file.
- print('Couldn\'t read parallel upload tracker file (%s): %s. '
- 'Restarting upload from scratch.' % (tracker_file, e.strerror))
-
- # The first line contains the randomly-generated prefix.
- random_prefix = lines[0]
-
- # The remaining lines were written in pairs to describe a single component
- # in the form:
- # object_name (without random prefix)
- # generation
- # Newlines are used as the delimiter because only newlines and carriage
- # returns are invalid characters in object names, and users can specify
- # a custom prefix in the config file.
- i = 1
- while i < len(lines):
- (name, generation) = (lines[i], lines[i+1])
- if generation == '':
- generation = None
- existing_objects.append(ObjectFromTracker(name, generation))
- i += 2
- return (random_prefix, existing_objects)
-
-def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component,
- tracker_file_lock):
- """Appends information about the uploaded component to the contents of an
- existing tracker file, following the format described in
- _CreateParallelUploadTrackerFile."""
- lines = _GetParallelUploadTrackerFileLinesForComponents([component])
- lines = [line + '\n' for line in lines]
- with tracker_file_lock:
- with open(tracker_file, 'a') as f:
- f.writelines(lines)
-
-def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components,
- tracker_file_lock):
- """Writes information about components that were successfully uploaded so
- that the upload can be resumed at a later date. The tracker file has
- the format:
- random_prefix
- temp_object_1_name
- temp_object_1_generation
- .
- .
- .
- temp_object_N_name
- temp_object_N_generation
- where N is the number of components that have been successfully uploaded.
-
- Args:
- tracker_file: The name of the parallel upload tracker file.
- random_prefix: The randomly-generated prefix that was used for
- for uploading any existing components.
- components: A list of ObjectFromTracker objects that were uploaded.
- """
- lines = [random_prefix]
- lines += _GetParallelUploadTrackerFileLinesForComponents(components)
- lines = [line + '\n' for line in lines]
- with tracker_file_lock:
- open(tracker_file, 'w').close() # Clear the file.
- with open(tracker_file, 'w') as f:
- f.writelines(lines)
-
-def _GetParallelUploadTrackerFileLinesForComponents(components):
- """Return a list of the lines that should appear in the parallel composite
- upload tracker file representing the given components, using the format
- as described in _CreateParallelUploadTrackerFile."""
- lines = []
- for component in components:
- generation = None
- generation = component.generation
- if not generation:
- generation = ''
- lines += [component.object_name, generation]
- return lines
-
-def FilterExistingComponents(dst_args, existing_components,
- bucket_name, suri_builder):
- """Given the list of all target objects based on partitioning the file and
- the list of objects that have already been uploaded successfully,
- this function determines which objects should be uploaded, which
- existing components are still valid, and which existing components should
- be deleted.
-
- Args:
- dst_args: The map of file_name -> PerformResumableUploadIfAppliesArgs
- calculated by partitioning the file.
- existing_components: A list of ObjectFromTracker objects that have been
- uploaded in the past.
- bucket_name: The name of the bucket in which the components exist.
-
- Returns:
- components_to_upload: List of components that need to be uploaded.
- uploaded_components: List of components that have already been
- uploaded and are still valid.
- existing_objects_to_delete: List of components that have already
- been uploaded, but are no longer valid
- and are in a versioned bucket, and
- therefore should be deleted.
- """
- components_to_upload = []
- existing_component_names = [component.object_name
- for component in existing_components]
- for component_name in dst_args:
- if not (component_name in existing_component_names):
- components_to_upload.append(dst_args[component_name])
-
- objects_already_chosen = []
-
- # Don't reuse any temporary components whose MD5 doesn't match the current
- # MD5 of the corresponding part of the file. If the bucket is versioned,
- # also make sure that we delete the existing temporary version.
- existing_objects_to_delete = []
- uploaded_components = []
- for tracker_object in existing_components:
- if ((not tracker_object.object_name in dst_args.keys())
- or tracker_object.object_name in objects_already_chosen):
- # This could happen if the component size has changed. This also serves
- # to handle object names that get duplicated in the tracker file due
- # to people doing things they shouldn't (e.g., overwriting an existing
- # temporary component in a versioned bucket).
-
- uri = MakeGsUri(bucket_name, tracker_object.object_name, suri_builder)
- uri.generation = tracker_object.generation
- existing_objects_to_delete.append(uri)
- continue
-
- dst_arg = dst_args[tracker_object.object_name]
- file_part = FilePart(dst_arg.filename, dst_arg.file_start,
- dst_arg.file_length)
- # TODO: calculate MD5's in parallel when possible.
- content_md5 = _CalculateMd5FromContents(file_part)
-
- try:
- # Get the MD5 of the currently-existing component.
- blr = BucketListingRef(dst_arg.dst_uri)
- etag = blr.GetKey().etag
- except Exception as e:
- # We don't actually care what went wrong - we couldn't retrieve the
- # object to check the MD5, so just upload it again.
- etag = None
- if etag != (('"%s"') % content_md5):
- components_to_upload.append(dst_arg)
- objects_already_chosen.append(tracker_object.object_name)
- if tracker_object.generation:
- # If the old object doesn't have a generation (i.e., it isn't in a
- # versioned bucket), then we will just overwrite it anyway.
- invalid_component_with_generation = copy.deepcopy(dst_arg.dst_uri)
- invalid_component_with_generation.generation = tracker_object.generation
- existing_objects_to_delete.append(invalid_component_with_generation)
- else:
- uri = copy.deepcopy(dst_arg.dst_uri)
- uri.generation = tracker_object.generation
- uploaded_components.append(uri)
- objects_already_chosen.append(tracker_object.object_name)
-
- if uploaded_components:
- logging.info(("Found %d existing temporary components to reuse.")
- % len(uploaded_components))
-
- return (components_to_upload, uploaded_components,
- existing_objects_to_delete)
-
-def MakeGsUri(bucket, filename, suri_builder):
- """Returns a StorageUri for an object in GCS."""
- return suri_builder.StorageUri(bucket + '/' + filename)
-
-def _CalculateMd5FromContents(file):
- """Calculates the MD5 hash of the contents of a file.
-
- Args:
- file: An already-open file object.
- """
- current_md5 = md5()
- file.seek(0)
- while True:
- data = file.read(8192)
- if not data:
- break
- current_md5.update(data)
- file.seek(0)
- return current_md5.hexdigest()
+ bucket = None
+ if exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket():
+ try:
+ bucket = self.gsutil_api.GetBucket(
+ exp_dst_url.bucket_name, provider=exp_dst_url.scheme,
+ fields=['versioning'])
+ except AccessDeniedException, e:
+ raise
+ except NotFoundException, e:
+ raise CommandException('Destination bucket %s does not exist.' %
+ exp_dst_url)
+ except Exception, e:
+ raise CommandException('Error retrieving destination bucket %s: %s' %
+ (exp_dst_url, e.message))
+ return bucket
« no previous file with comments | « gslib/commands/cors.py ('k') | gslib/commands/defacl.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698