Index: third_party/gsutil/gslib/commands/cp.py |
diff --git a/third_party/gsutil/gslib/commands/cp.py b/third_party/gsutil/gslib/commands/cp.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..215b505793661d4219fa7ef13db6767c72867798 |
--- /dev/null |
+++ b/third_party/gsutil/gslib/commands/cp.py |
@@ -0,0 +1,1819 @@ |
+# Copyright 2011 Google Inc. All Rights Reserved. |
+# Copyright 2011, Nexenta Systems Inc. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+ |
+import boto |
+import errno |
+import gzip |
+import hashlib |
+import mimetypes |
+import os |
+import platform |
+import re |
+import subprocess |
+import stat |
+import sys |
+import tempfile |
+import threading |
+import time |
+ |
+from boto import config |
+from boto.exception import GSResponseError |
+from boto.exception import ResumableUploadException |
+from boto.gs.resumable_upload_handler import ResumableUploadHandler |
+from boto.s3.keyfile import KeyFile |
+from boto.s3.resumable_download_handler import ResumableDownloadHandler |
+from boto.storage_uri import BucketStorageUri |
+from gslib.command import COMMAND_NAME |
+from gslib.command import COMMAND_NAME_ALIASES |
+from gslib.command import CONFIG_REQUIRED |
+from gslib.command import Command |
+from gslib.command import FILE_URIS_OK |
+from gslib.command import MAX_ARGS |
+from gslib.command import MIN_ARGS |
+from gslib.command import PROVIDER_URIS_OK |
+from gslib.command import SUPPORTED_SUB_ARGS |
+from gslib.command import URIS_START_ARG |
+from gslib.exception import CommandException |
+from gslib.help_provider import HELP_NAME |
+from gslib.help_provider import HELP_NAME_ALIASES |
+from gslib.help_provider import HELP_ONE_LINE_SUMMARY |
+from gslib.help_provider import HELP_TEXT |
+from gslib.help_provider import HELP_TYPE |
+from gslib.help_provider import HelpType |
+from gslib.name_expansion import NameExpansionIterator |
+from gslib.util import ExtractErrorDetail |
+from gslib.util import IS_WINDOWS |
+from gslib.util import MakeHumanReadable |
+from gslib.util import NO_MAX |
+from gslib.util import TWO_MB |
+from gslib.wildcard_iterator import ContainsWildcard |
+ |
+_detailed_help_text = (""" |
+<B>SYNOPSIS</B> |
+ gsutil cp [OPTION]... src_uri dst_uri |
+ - or - |
+ gsutil cp [OPTION]... src_uri... dst_uri |
+ - or - |
+ gsutil cp [OPTION]... -I dst_uri |
+ |
+ |
+<B>DESCRIPTION</B> |
+ The gsutil cp command allows you to copy data between your local file |
+ system and the cloud, copy data within the cloud, and copy data between |
+ cloud storage providers. For example, to copy all text files from the |
+ local directory to a bucket you could do: |
+ |
+ gsutil cp *.txt gs://my_bucket |
+ |
+ Similarly, you can download text files from a bucket by doing: |
+ |
+ gsutil cp gs://my_bucket/*.txt . |
+ |
+ If you want to copy an entire directory tree you need to use the -R option: |
+ |
+ gsutil cp -R dir gs://my_bucket |
+ |
+ If you have a large number of files to upload you might want to use the |
+ gsutil -m option, to perform a parallel (multi-threaded/multi-processing) |
+ copy: |
+ |
+ gsutil -m cp -R dir gs://my_bucket |
+ |
+ You can pass a list of URIs to copy on STDIN instead of as command line |
+ arguments by using the -I option. This allows you to use gsutil in a |
+ pipeline to copy files and objects as generated by a program, such as: |
+ |
+ some_program | gsutil -m cp -I gs://my_bucket |
+ |
+ The contents of STDIN can name files, cloud URIs, and wildcards of files |
+ and cloud URIs. |
+ |
+ |
+<B>HOW NAMES ARE CONSTRUCTED</B> |
+ The gsutil cp command strives to name objects in a way consistent with how |
+ Linux cp works, which causes names to be constructed in varying ways depending |
+ on whether you're performing a recursive directory copy or copying |
+ individually named objects; and whether you're copying to an existing or |
+ non-existent directory. |
+ |
+ When performing recursive directory copies, object names are constructed |
+ that mirror the source directory structure starting at the point of |
+ recursive processing. For example, the command: |
+ |
+ gsutil cp -R dir1/dir2 gs://my_bucket |
+ |
+ will create objects named like gs://my_bucket/dir2/a/b/c, assuming |
+ dir1/dir2 contains the file a/b/c. |
+ |
+ In contrast, copying individually named files will result in objects named |
+ by the final path component of the source files. For example, the command: |
+ |
+ gsutil cp dir1/dir2/** gs://my_bucket |
+ |
+ will create objects named like gs://my_bucket/c. |
+ |
+ The same rules apply for downloads: recursive copies of buckets and |
+ bucket subdirectories produce a mirrored filename structure, while copying |
+ individually (or wildcard) named objects produce flatly named files. |
+ |
+ Note that in the above example the '**' wildcard matches all names |
+ anywhere under dir. The wildcard '*' will match names just one level deep. For |
+ more details see 'gsutil help wildcards'. |
+ |
+ There's an additional wrinkle when working with subdirectories: the resulting |
+ names depend on whether the destination subdirectory exists. For example, |
+ if gs://my_bucket/subdir exists as a subdirectory, the command: |
+ |
+ gsutil cp -R dir1/dir2 gs://my_bucket/subdir |
+ |
+ will create objects named like gs://my_bucket/subdir/dir2/a/b/c. In contrast, |
+ if gs://my_bucket/subdir does not exist, this same gsutil cp command will |
+ create objects named like gs://my_bucket/subdir/a/b/c. |
+ |
+ |
+<B>COPYING TO/FROM SUBDIRECTORIES; DISTRIBUTING TRANSFERS ACROSS MACHINES</B> |
+ You can use gsutil to copy to and from subdirectories by using a command like: |
+ |
+ gsutil cp -R dir gs://my_bucket/data |
+ |
+ This will cause dir and all of its files and nested subdirectories to be |
+ copied under the specified destination, resulting in objects with names like |
+ gs://my_bucket/data/dir/a/b/c. Similarly you can download from bucket |
+ subdirectories by using a command like: |
+ |
+ gsutil cp -R gs://my_bucket/data dir |
+ |
+ This will cause everything nested under gs://my_bucket/data to be downloaded |
+ into dir, resulting in files with names like dir/data/a/b/c. |
+ |
+ Copying subdirectories is useful if you want to add data to an existing |
+ bucket directory structure over time. It's also useful if you want |
+ to parallelize uploads and downloads across multiple machines (often |
+ reducing overall transfer time compared with simply running gsutil -m |
+ cp on one machine). For example, if your bucket contains this structure: |
+ |
+ gs://my_bucket/data/result_set_01/ |
+ gs://my_bucket/data/result_set_02/ |
+ ... |
+ gs://my_bucket/data/result_set_99/ |
+ |
+ you could perform concurrent downloads across 3 machines by running these |
+ commands on each machine, respectively: |
+ |
+ gsutil -m cp -R gs://my_bucket/data/result_set_[0-3]* dir |
+ gsutil -m cp -R gs://my_bucket/data/result_set_[4-6]* dir |
+ gsutil -m cp -R gs://my_bucket/data/result_set_[7-9]* dir |
+ |
+ Note that dir could be a local directory on each machine, or it could |
+ be a directory mounted off of a shared file server; whether the latter |
+ performs acceptably may depend on a number of things, so we recommend |
+ you experiment and find out what works best for you. |
+ |
+ |
+<B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B> |
+ If both the source and destination URI are cloud URIs from the same |
+ provider, gsutil copies data "in the cloud" (i.e., without downloading |
+ to and uploading from the machine where you run gsutil). In addition to |
+ the performance and cost advantages of doing this, copying in the cloud |
+ preserves metadata (like Content-Type and Cache-Control). In contrast, |
+ when you download data from the cloud it ends up in a file, which has |
+ no associated metadata. Thus, unless you have some way to hold on to |
+ or re-create that metadata, downloading to a file will not retain the |
+ metadata. |
+ |
+ Note that by default, the gsutil cp command does not copy the object |
+ ACL to the new object, and instead will use the default bucket ACL (see |
+ "gsutil help setdefacl"). You can override this behavior with the -p |
+ option (see OPTIONS below). |
+ |
+ gsutil does not preserve metadata when copying objects between providers. |
+ |
+ |
+<B>RESUMABLE TRANSFERS</B> |
+ gsutil automatically uses the Google Cloud Storage resumable upload |
+ feature whenever you use the cp command to upload an object that is larger |
+ than 2 MB. You do not need to specify any special command line options |
+ to make this happen. If your upload is interrupted you can restart the |
+ upload by running the same cp command that you ran to start the upload. |
+ |
+ Similarly, gsutil automatically performs resumable downloads (using HTTP |
+ standard Range GET operations) whenever you use the cp command to download an |
+ object larger than 2 MB. |
+ |
+ Resumable uploads and downloads store some state information in a file |
+ in ~/.gsutil named by the destination object or file. If you attempt to |
+ resume a transfer from a machine with a different directory, the transfer |
+ will start over from scratch. |
+ |
+ See also "gsutil help prod" for details on using resumable transfers |
+ in production. |
+ |
+ |
+<B>STREAMING TRANSFERS</B> |
+ Use '-' in place of src_uri or dst_uri to perform a streaming |
+ transfer. For example: |
+ long_running_computation | gsutil cp - gs://my_bucket/obj |
+ |
+ Streaming transfers do not support resumable uploads/downloads. |
+ (The Google resumable transfer protocol has a way to support streaming |
+ transers, but gsutil doesn't currently implement support for this.) |
+ |
+ |
+<B>CHANGING TEMP DIRECTORIES</B> |
+ gsutil writes data to a temporary directory in several cases: |
+ - when compressing data to be uploaded (see the -z option) |
+ - when decompressing data being downloaded (when the data has |
+ Content-Encoding:gzip, e.g., as happens when uploaded using gsutil cp -z) |
+ - when running integration tests (using the gsutil test command) |
+ |
+ In these cases it's possible the temp file location on your system that |
+ gsutil selects by default may not have enough space. If you find that |
+ gsutil runs out of space during one of these operations (e.g., raising |
+ "CommandException: Inadequate temp space available to compress <your file>" |
+ during a gsutil cp -z operation), you can change where it writes these |
+ temp files by setting the TMPDIR environment variable. On Linux and MacOS |
+ you can do this either by running gsutil this way: |
+ |
+ TMPDIR=/some/directory gsutil cp ... |
+ |
+ or by adding this line to your ~/.bashrc file and then restarting the shell |
+ before running gsutil: |
+ |
+ export TMPDIR=/some/directory |
+ |
+ On Windows 7 you can change the TMPDIR environment variable from Start -> |
+ Computer -> System -> Advanced System Settings -> Environment Variables. |
+ You need to reboot after making this change for it to take effect. (Rebooting |
+ is not necessary after running the export command on Linux and MacOS.) |
+ |
+ |
+<B>OPTIONS</B> |
+ -a canned_acl Sets named canned_acl when uploaded objects created. See |
+ 'gsutil help acls' for further details. |
+ |
+ -c If an error occurrs, continue to attempt to copy the remaining |
+ files. |
+ |
+ -D Copy in "daisy chain" mode, i.e., copying between two buckets by |
+ hooking a download to an upload, via the machine where gsutil is |
+ run. By default, data are copied between two buckets "in the |
+ cloud", i.e., without needing to copy via the machine where |
+ gsutil runs. However, copy-in-the-cloud is not supported when |
+ copying between different locations (like US and EU) or between |
+ different storage classes (like STANDARD and |
+ DURABLE_REDUCED_AVAILABILITY). For these cases, you can use the |
+ -D option to copy data between buckets. |
+ Note: Daisy chain mode is automatically used when copying |
+ between providers (e.g., to copy data from Google Cloud Storage |
+ to another provider). |
+ |
+ -e Exclude symlinks. When specified, symbolic links will not be |
+ copied. |
+ |
+ -n No-clobber. When specified, existing files or objects at the |
+ destination will not be overwritten. Any items that are skipped |
+ by this option will be reported as being skipped. This option |
+ will perform an additional HEAD request to check if an item |
+ exists before attempting to upload the data. This will save |
+ retransmitting data, but the additional HTTP requests may make |
+ small object transfers slower and more expensive. |
+ |
+ This option can be combined with the -c option to build a script |
+ that copies a large number of objects, allowing retries when |
+ some failures occur from which gsutil doesn't automatically |
+ recover, using a bash script like the following: |
+ |
+ status=1 |
+ while [ $status -ne 0 ] ; do |
+ gsutil cp -c -n -R ./dir gs://bucket |
+ status=$? |
+ done |
+ |
+ The -c option will cause copying to continue after failures |
+ occur, and the -n option will cause objects already copied to be |
+ skipped on subsequent iterations. The loop will continue running |
+ as long as gsutil exits with a non-zero status (such a status |
+ indicates there was at least one failure during the gsutil run). |
+ |
+ -p Causes ACLs to be preserved when copying in the cloud. Note that |
+ this option has performance and cost implications, because it |
+ is essentially performing three requests (getacl, cp, setacl). |
+ (The performance issue can be mitigated to some degree by |
+ using gsutil -m cp to cause parallel copying.) |
+ |
+ You can avoid the additional performance and cost of using cp -p |
+ if you want all objects in the destination bucket to end up with |
+ the same ACL by setting a default ACL on that bucket instead of |
+ using cp -p. See "help gsutil setdefacl". |
+ |
+ Note that it's not valid to specify both the -a and -p options |
+ together. |
+ |
+ -q Causes copies to be performed quietly, i.e., without reporting |
+ progress indicators of files being copied. Errors are still |
+ reported. This option can be useful for running gsutil from a |
+ cron job that logs its output to a file, for which the only |
+ information desired in the log is failures. |
+ |
+ -R, -r Causes directories, buckets, and bucket subdirectories to be |
+ copied recursively. If you neglect to use this option for |
+ an upload, gsutil will copy any files it finds and skip any |
+ directories. Similarly, neglecting to specify -R for a download |
+ will cause gsutil to copy any objects at the current bucket |
+ directory level, and skip any subdirectories. |
+ |
+ -v Requests that the version-specific URI for each uploaded object |
+ be printed. Given this URI you can make future upload requests |
+ that are safe in the face of concurrent updates, because Google |
+ Cloud Storage will refuse to perform the update if the current |
+ object version doesn't match the version-specific URI. See |
+ 'gsutil help versioning' for more details. Note: at present this |
+ option does not work correctly for objects copied "in the cloud" |
+ (e.g., gsutil cp gs://bucket/obj1 gs://bucket/obj2). |
+ |
+ -z ext1,... Compresses file uploads with the given extensions. If you are |
+ uploading a large file with compressible content, such as |
+ a .js, .css, or .html file, you can gzip-compress the file |
+ during the upload process by specifying the -z <extensions> |
+ option. Compressing data before upload saves on usage charges |
+ because you are uploading a smaller amount of data. |
+ |
+ When you specify the -z option, the data from your files is |
+ compressed before it is uploaded, but your actual files are left |
+ uncompressed on the local disk. The uploaded objects retain the |
+ original content type and name as the original files but are |
+ given a Content-Encoding header with the value "gzip" to |
+ indicate that the object data stored are compressed on the |
+ Google Cloud Storage servers. |
+ |
+ For example, the following command: |
+ |
+ gsutil cp -z html -a public-read cattypes.html gs://mycats |
+ |
+ will do all of the following: |
+ - Upload as the object gs://mycats/cattypes.html (cp command) |
+ - Set the Content-Type to text/html (based on file extension) |
+ - Compress the data in the file cattypes.html (-z option) |
+ - Set the Content-Encoding to gzip (-z option) |
+ - Set the ACL to public-read (-a option) |
+ - If a user tries to view cattypes.html in a browser, the |
+ browser will know to uncompress the data based on the |
+ Content-Encoding header, and to render it as HTML based on |
+ the Content-Type header. |
+""") |
+ |
+class CpCommand(Command): |
+ """ |
+ Implementation of gsutil cp command. |
+ |
+ Note that CpCommand is run for both gsutil cp and gsutil mv. The latter |
+ happens by MvCommand calling CpCommand and passing the hidden (undocumented) |
+ -M option. This allows the copy and remove needed for each mv to run |
+ together (rather than first running all the cp's and then all the rm's, as |
+ we originally had implemented), which in turn avoids the following problem |
+ with removing the wrong objects: starting with a bucket containing only |
+ the object gs://bucket/obj, say the user does: |
+ gsutil mv gs://bucket/* gs://bucket/d.txt |
+ If we ran all the cp's and then all the rm's and we didn't expand the wildcard |
+ first, the cp command would first copy gs://bucket/obj to gs://bucket/d.txt, |
+ and the rm command would then remove that object. In the implementation |
+ prior to gsutil release 3.12 we avoided this by building a list of objects |
+ to process and then running the copies and then the removes; but building |
+ the list up front limits scalability (compared with the current approach |
+ of processing the bucket listing iterator on the fly). |
+ """ |
+ |
+ # Set default Content-Type type. |
+ DEFAULT_CONTENT_TYPE = 'application/octet-stream' |
+ USE_MAGICFILE = boto.config.getbool('GSUtil', 'use_magicfile', False) |
+ |
+ # Command specification (processed by parent class). |
+ command_spec = { |
+ # Name of command. |
+ COMMAND_NAME : 'cp', |
+ # List of command name aliases. |
+ COMMAND_NAME_ALIASES : ['copy'], |
+ # Min number of args required by this command. |
+ MIN_ARGS : 1, |
+ # Max number of args required by this command, or NO_MAX. |
+ MAX_ARGS : NO_MAX, |
+ # Getopt-style string specifying acceptable sub args. |
+ # -t is deprecated but leave intact for now to avoid breakage. |
+ SUPPORTED_SUB_ARGS : 'a:cDeIMNnpqrRtvz:', |
+ # True if file URIs acceptable for this command. |
+ FILE_URIS_OK : True, |
+ # True if provider-only URIs acceptable for this command. |
+ PROVIDER_URIS_OK : False, |
+ # Index in args of first URI arg. |
+ URIS_START_ARG : 0, |
+ # True if must configure gsutil before running command. |
+ CONFIG_REQUIRED : True, |
+ } |
+ help_spec = { |
+ # Name of command or auxiliary help info for which this help applies. |
+ HELP_NAME : 'cp', |
+ # List of help name aliases. |
+ HELP_NAME_ALIASES : ['copy'], |
+ # Type of help: |
+ HELP_TYPE : HelpType.COMMAND_HELP, |
+ # One line summary of this help. |
+ HELP_ONE_LINE_SUMMARY : 'Copy files and objects', |
+ # The full help text. |
+ HELP_TEXT : _detailed_help_text, |
+ } |
+ |
+ def _CheckFinalMd5(self, key, file_name): |
+ """ |
+ Checks that etag from server agrees with md5 computed after the |
+ download completes. |
+ """ |
+ obj_md5 = key.etag.strip('"\'') |
+ file_md5 = None |
+ |
+ if hasattr(key, 'md5') and key.md5: |
+ file_md5 = key.md5 |
+ else: |
+ print 'Computing MD5 from scratch for resumed download' |
+ |
+ # Open file in binary mode to avoid surprises in Windows. |
+ fp = open(file_name, 'rb') |
+ try: |
+ file_md5 = key.compute_md5(fp)[0] |
+ finally: |
+ fp.close() |
+ |
+ if self.debug: |
+ print 'Checking file md5 against etag. (%s/%s)' % (file_md5, obj_md5) |
+ if file_md5 != obj_md5: |
+ # Checksums don't match - remove file and raise exception. |
+ os.unlink(file_name) |
+ raise CommandException( |
+ 'File changed during download: md5 signature doesn\'t match ' |
+ 'etag (incorrect downloaded file deleted)') |
+ |
+ def _CheckForDirFileConflict(self, exp_src_uri, dst_uri): |
+ """Checks whether copying exp_src_uri into dst_uri is not possible. |
+ |
+ This happens if a directory exists in local file system where a file |
+ needs to go or vice versa. In that case we print an error message and |
+ exits. Example: if the file "./x" exists and you try to do: |
+ gsutil cp gs://mybucket/x/y . |
+ the request can't succeed because it requires a directory where |
+ the file x exists. |
+ |
+ Note that we don't enforce any corresponding restrictions for buckets, |
+ because the flat namespace semantics for buckets doesn't prohibit such |
+ cases the way hierarchical file systems do. For example, if a bucket |
+ contains an object called gs://bucket/dir and then you run the command: |
+ gsutil cp file1 file2 gs://bucket/dir |
+ you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and |
+ gs://bucket/dir/file2. |
+ |
+ Args: |
+ exp_src_uri: Expanded source StorageUri of copy. |
+ dst_uri: Destination URI. |
+ |
+ Raises: |
+ CommandException: if errors encountered. |
+ """ |
+ if dst_uri.is_cloud_uri(): |
+ # The problem can only happen for file destination URIs. |
+ return |
+ dst_path = dst_uri.object_name |
+ final_dir = os.path.dirname(dst_path) |
+ if os.path.isfile(final_dir): |
+ raise CommandException('Cannot retrieve %s because a file exists ' |
+ 'where a directory needs to be created (%s).' % |
+ (exp_src_uri, final_dir)) |
+ if os.path.isdir(dst_path): |
+ raise CommandException('Cannot retrieve %s because a directory exists ' |
+ '(%s) where the file needs to be created.' % |
+ (exp_src_uri, dst_path)) |
+ |
+ def _InsistDstUriNamesContainer(self, exp_dst_uri, |
+ have_existing_dst_container, command_name): |
+ """ |
+ Raises an exception if URI doesn't name a directory, bucket, or bucket |
+ subdir, with special exception for cp -R (see comments below). |
+ |
+ Args: |
+ exp_dst_uri: Wildcard-expanding dst_uri. |
+ have_existing_dst_container: bool indicator of whether exp_dst_uri |
+ names a container (directory, bucket, or existing bucket subdir). |
+ command_name: Name of command making call. May not be the same as |
+ self.command_name in the case of commands implemented atop other |
+ commands (like mv command). |
+ |
+ Raises: |
+ CommandException: if the URI being checked does not name a container. |
+ """ |
+ if exp_dst_uri.is_file_uri(): |
+ ok = exp_dst_uri.names_directory() |
+ else: |
+ if have_existing_dst_container: |
+ ok = True |
+ else: |
+ # It's ok to specify a non-existing bucket subdir, for example: |
+ # gsutil cp -R dir gs://bucket/abc |
+ # where gs://bucket/abc isn't an existing subdir. |
+ ok = exp_dst_uri.names_object() |
+ if not ok: |
+ raise CommandException('Destination URI must name a directory, bucket, ' |
+ 'or bucket\nsubdirectory for the multiple ' |
+ 'source form of the %s command.' % command_name) |
+ |
+ class _FileCopyCallbackHandler(object): |
+ """Outputs progress info for large copy requests.""" |
+ |
+ def __init__(self, upload): |
+ if upload: |
+ self.announce_text = 'Uploading' |
+ else: |
+ self.announce_text = 'Downloading' |
+ |
+ def call(self, total_bytes_transferred, total_size): |
+ sys.stderr.write('%s: %s/%s \r' % ( |
+ self.announce_text, |
+ MakeHumanReadable(total_bytes_transferred), |
+ MakeHumanReadable(total_size))) |
+ if total_bytes_transferred == total_size: |
+ sys.stderr.write('\n') |
+ |
+ class _StreamCopyCallbackHandler(object): |
+ """Outputs progress info for Stream copy to cloud. |
+ Total Size of the stream is not known, so we output |
+ only the bytes transferred. |
+ """ |
+ |
+ def call(self, total_bytes_transferred, total_size): |
+ sys.stderr.write('Uploading: %s \r' % ( |
+ MakeHumanReadable(total_bytes_transferred))) |
+ if total_size and total_bytes_transferred == total_size: |
+ sys.stderr.write('\n') |
+ |
+ def _GetTransferHandlers(self, dst_uri, size, upload): |
+ """ |
+ Selects upload/download and callback handlers. |
+ |
+ We use a callback handler that shows a simple textual progress indicator |
+ if size is above the configurable threshold. |
+ |
+ We use a resumable transfer handler if size is >= the configurable |
+ threshold and resumable transfers are supported by the given provider. |
+ boto supports resumable downloads for all providers, but resumable |
+ uploads are currently only supported by GS. |
+ |
+ Args: |
+ dst_uri: the destination URI. |
+ size: size of file (object) being uploaded (downloaded). |
+ upload: bool indication of whether transfer is an upload. |
+ """ |
+ config = boto.config |
+ resumable_threshold = config.getint('GSUtil', 'resumable_threshold', TWO_MB) |
+ transfer_handler = None |
+ cb = None |
+ num_cb = None |
+ |
+ # Checks whether the destination file is a "special" file, like /dev/null on |
+ # Linux platforms or null on Windows platforms, so we can disable resumable |
+ # download support since the file size of the destination won't ever be |
+ # correct. |
+ dst_is_special = False |
+ if dst_uri.is_file_uri(): |
+ # Check explicitly first because os.stat doesn't work on 'nul' in Windows. |
+ if dst_uri.object_name == os.devnull: |
+ dst_is_special = True |
+ try: |
+ mode = os.stat(dst_uri.object_name).st_mode |
+ if stat.S_ISCHR(mode): |
+ dst_is_special = True |
+ except OSError: |
+ pass |
+ |
+ if size >= resumable_threshold and not dst_is_special: |
+ if not self.quiet: |
+ cb = self._FileCopyCallbackHandler(upload).call |
+ num_cb = int(size / TWO_MB) |
+ |
+ resumable_tracker_dir = config.get( |
+ 'GSUtil', 'resumable_tracker_dir', |
+ os.path.expanduser('~' + os.sep + '.gsutil')) |
+ if not os.path.exists(resumable_tracker_dir): |
+ os.makedirs(resumable_tracker_dir) |
+ |
+ if upload: |
+ # Encode the dest bucket and object name into the tracker file name. |
+ res_tracker_file_name = ( |
+ re.sub('[/\\\\]', '_', 'resumable_upload__%s__%s.url' % |
+ (dst_uri.bucket_name, dst_uri.object_name))) |
+ else: |
+ # Encode the fully-qualified dest file name into the tracker file name. |
+ res_tracker_file_name = ( |
+ re.sub('[/\\\\]', '_', 'resumable_download__%s.etag' % |
+ (os.path.realpath(dst_uri.object_name)))) |
+ |
+ res_tracker_file_name = _hash_filename(res_tracker_file_name) |
+ tracker_file = '%s%s%s' % (resumable_tracker_dir, os.sep, |
+ res_tracker_file_name) |
+ if upload: |
+ if dst_uri.scheme == 'gs': |
+ transfer_handler = ResumableUploadHandler(tracker_file) |
+ else: |
+ transfer_handler = ResumableDownloadHandler(tracker_file) |
+ |
+ return (cb, num_cb, transfer_handler) |
+ |
+ def _LogCopyOperation(self, src_uri, dst_uri, headers): |
+ """ |
+ Logs copy operation being performed, including Content-Type if appropriate. |
+ """ |
+ if self.quiet: |
+ return |
+ if 'Content-Type' in headers and dst_uri.is_cloud_uri(): |
+ content_type_msg = ' [Content-Type=%s]' % headers['Content-Type'] |
+ else: |
+ content_type_msg = '' |
+ if src_uri.is_stream(): |
+ self.THREADED_LOGGER.info('Copying from <STDIN>%s...', content_type_msg) |
+ else: |
+ self.THREADED_LOGGER.info('Copying %s%s...', src_uri, content_type_msg) |
+ |
+ # We pass the headers explicitly to this call instead of using self.headers |
+ # so we can set different metadata (like Content-Type type) for each object. |
+ def _CopyObjToObjInTheCloud(self, src_key, src_uri, dst_uri, headers): |
+ """Performs copy-in-the cloud from specified src to dest object. |
+ |
+ Args: |
+ src_key: Source Key. |
+ src_uri: Source StorageUri. |
+ dst_uri: Destination StorageUri. |
+ headers: A copy of the headers dictionary. |
+ |
+ Returns: |
+ (elapsed_time, bytes_transferred, dst_uri) excluding overhead like initial |
+ HEAD. Note: At present copy-in-the-cloud doesn't return the generation of |
+ the created object, so the returned URI is actually not version-specific |
+ (unlike other cp cases). |
+ |
+ Raises: |
+ CommandException: if errors encountered. |
+ """ |
+ self._SetContentTypeHeader(src_uri, headers) |
+ self._LogCopyOperation(src_uri, dst_uri, headers) |
+ # Do Object -> object copy within same provider (uses |
+ # x-<provider>-copy-source metadata HTTP header to request copying at the |
+ # server). |
+ src_bucket = src_uri.get_bucket(False, headers) |
+ preserve_acl = False |
+ canned_acl = None |
+ if self.sub_opts: |
+ for o, a in self.sub_opts: |
+ if o == '-a': |
+ canned_acls = dst_uri.canned_acls() |
+ if a not in canned_acls: |
+ raise CommandException('Invalid canned ACL "%s".' % a) |
+ canned_acl = a |
+ headers[dst_uri.get_provider().acl_header] = canned_acl |
+ if o == '-p': |
+ preserve_acl = True |
+ if preserve_acl and canned_acl: |
+ raise CommandException( |
+ 'Specifying both the -p and -a options together is invalid.') |
+ start_time = time.time() |
+ # Pass headers in headers param not metadata param, so boto will copy |
+ # existing key's metadata and just set the additional headers specified |
+ # in the headers param (rather than using the headers to override existing |
+ # metadata). In particular this allows us to copy the existing key's |
+ # Content-Type and other metadata users need while still being able to |
+ # set headers the API needs (like x-goog-project-id). Note that this means |
+ # you can't do something like: |
+ # gsutil cp -t Content-Type text/html gs://bucket/* gs://bucket2 |
+ # to change the Content-Type while copying. |
+ |
+ try: |
+ dst_key = dst_uri.copy_key( |
+ src_bucket.name, src_uri.object_name, preserve_acl=False, |
+ headers=headers, src_version_id=src_uri.version_id, |
+ src_generation=src_uri.generation) |
+ except GSResponseError as e: |
+ exc_name, error_detail = ExtractErrorDetail(e) |
+ if (exc_name == 'GSResponseError' |
+ and ('Copy-in-the-cloud disallowed' in error_detail)): |
+ raise CommandException('%s.\nNote: you can copy between locations ' |
+ 'and between storage classes by using the ' |
+ 'gsutil cp -D option.' % error_detail) |
+ else: |
+ raise |
+ end_time = time.time() |
+ return (end_time - start_time, src_key.size, |
+ dst_uri.clone_replace_key(dst_key)) |
+ |
+ def _CheckFreeSpace(self, path): |
+ """Return path/drive free space (in bytes).""" |
+ if platform.system() == 'Windows': |
+ from ctypes import c_int, c_uint64, c_wchar_p, windll, POINTER, WINFUNCTYPE, WinError |
+ try: |
+ GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_wchar_p, POINTER(c_uint64), |
+ POINTER(c_uint64), POINTER(c_uint64)) |
+ GetDiskFreeSpaceEx = GetDiskFreeSpaceEx( |
+ ('GetDiskFreeSpaceExW', windll.kernel32), ( |
+ (1, 'lpszPathName'), |
+ (2, 'lpFreeUserSpace'), |
+ (2, 'lpTotalSpace'), |
+ (2, 'lpFreeSpace'),)) |
+ except AttributeError: |
+ GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_char_p, POINTER(c_uint64), |
+ POINTER(c_uint64), POINTER(c_uint64)) |
+ GetDiskFreeSpaceEx = GetDiskFreeSpaceEx( |
+ ('GetDiskFreeSpaceExA', windll.kernel32), ( |
+ (1, 'lpszPathName'), |
+ (2, 'lpFreeUserSpace'), |
+ (2, 'lpTotalSpace'), |
+ (2, 'lpFreeSpace'),)) |
+ |
+ def GetDiskFreeSpaceEx_errcheck(result, func, args): |
+ if not result: |
+ raise WinError() |
+ return args[1].value |
+ GetDiskFreeSpaceEx.errcheck = GetDiskFreeSpaceEx_errcheck |
+ |
+ return GetDiskFreeSpaceEx(os.getenv('SystemDrive')) |
+ else: |
+ (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path) |
+ return f_frsize * f_bavail |
+ |
+ def _PerformResumableUploadIfApplies(self, fp, dst_uri, canned_acl, headers): |
+ """ |
+ Performs resumable upload if supported by provider and file is above |
+ threshold, else performs non-resumable upload. |
+ |
+ Returns (elapsed_time, bytes_transferred, version-specific dst_uri). |
+ """ |
+ start_time = time.time() |
+ # Determine file size different ways for case where fp is actually a wrapper |
+ # around a Key vs an actual file. |
+ if isinstance(fp, KeyFile): |
+ file_size = fp.getkey().size |
+ else: |
+ file_size = os.path.getsize(fp.name) |
+ (cb, num_cb, res_upload_handler) = self._GetTransferHandlers( |
+ dst_uri, file_size, True) |
+ if dst_uri.scheme == 'gs': |
+ # Resumable upload protocol is Google Cloud Storage-specific. |
+ dst_uri.set_contents_from_file(fp, headers, policy=canned_acl, |
+ cb=cb, num_cb=num_cb, |
+ res_upload_handler=res_upload_handler) |
+ else: |
+ dst_uri.set_contents_from_file(fp, headers, policy=canned_acl, |
+ cb=cb, num_cb=num_cb) |
+ if res_upload_handler: |
+ # ResumableUploadHandler does not update upload_start_point from its |
+ # initial value of -1 if transferring the whole file, so clamp at 0 |
+ bytes_transferred = file_size - max( |
+ res_upload_handler.upload_start_point, 0) |
+ else: |
+ bytes_transferred = file_size |
+ end_time = time.time() |
+ return (end_time - start_time, bytes_transferred, dst_uri) |
+ |
+ def _PerformStreamingUpload(self, fp, dst_uri, headers, canned_acl=None): |
+ """ |
+ Performs a streaming upload to the cloud. |
+ |
+ Args: |
+ fp: The file whose contents to upload. |
+ dst_uri: Destination StorageUri. |
+ headers: A copy of the headers dictionary. |
+ canned_acl: Optional canned ACL to set on the object. |
+ |
+ Returns (elapsed_time, bytes_transferred, version-specific dst_uri). |
+ """ |
+ start_time = time.time() |
+ |
+ if self.quiet: |
+ cb = None |
+ else: |
+ cb = self._StreamCopyCallbackHandler().call |
+ dst_uri.set_contents_from_stream( |
+ fp, headers, policy=canned_acl, cb=cb) |
+ try: |
+ bytes_transferred = fp.tell() |
+ except: |
+ bytes_transferred = 0 |
+ |
+ end_time = time.time() |
+ return (end_time - start_time, bytes_transferred, dst_uri) |
+ |
+ def _SetContentTypeHeader(self, src_uri, headers): |
+ """ |
+ Sets content type header to value specified in '-h Content-Type' option (if |
+ specified); else sets using Content-Type detection. |
+ """ |
+ if 'Content-Type' in headers: |
+ # If empty string specified (i.e., -h "Content-Type:") set header to None, |
+ # which will inhibit boto from sending the CT header. Otherwise, boto will |
+ # pass through the user specified CT header. |
+ if not headers['Content-Type']: |
+ headers['Content-Type'] = None |
+ # else we'll keep the value passed in via -h option (not performing |
+ # content type detection). |
+ else: |
+ # Only do content type recognition is src_uri is a file. Object-to-object |
+ # copies with no -h Content-Type specified re-use the content type of the |
+ # source object. |
+ if src_uri.is_file_uri(): |
+ object_name = src_uri.object_name |
+ content_type = None |
+ # Streams (denoted by '-') are expected to be 'application/octet-stream' |
+ # and 'file' would partially consume them. |
+ if object_name != '-': |
+ if self.USE_MAGICFILE: |
+ p = subprocess.Popen(['file', '--mime-type', object_name], |
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
+ output, error = p.communicate() |
+ if p.returncode != 0 or error: |
+ raise CommandException( |
+ 'Encountered error running "file --mime-type %s" ' |
+ '(returncode=%d).\n%s' % (object_name, p.returncode, error)) |
+ # Parse output by removing line delimiter and splitting on last ": |
+ content_type = output.rstrip().rpartition(': ')[2] |
+ else: |
+ content_type = mimetypes.guess_type(object_name)[0] |
+ if not content_type: |
+ content_type = self.DEFAULT_CONTENT_TYPE |
+ headers['Content-Type'] = content_type |
+ |
+ def _UploadFileToObject(self, src_key, src_uri, dst_uri, headers, |
+ should_log=True): |
+ """Uploads a local file to an object. |
+ |
+ Args: |
+ src_key: Source StorageUri. Must be a file URI. |
+ src_uri: Source StorageUri. |
+ dst_uri: Destination StorageUri. |
+ headers: The headers dictionary. |
+ should_log: bool indicator whether we should log this operation. |
+ Returns: |
+ (elapsed_time, bytes_transferred, version-specific dst_uri), excluding |
+ overhead like initial HEAD. |
+ |
+ Raises: |
+ CommandException: if errors encountered. |
+ """ |
+ gzip_exts = [] |
+ canned_acl = None |
+ if self.sub_opts: |
+ for o, a in self.sub_opts: |
+ if o == '-a': |
+ canned_acls = dst_uri.canned_acls() |
+ if a not in canned_acls: |
+ raise CommandException('Invalid canned ACL "%s".' % a) |
+ canned_acl = a |
+ elif o == '-t': |
+ print('Warning: -t is deprecated, and will be removed in the future. ' |
+ 'Content type\ndetection is ' |
+ 'now performed by default, unless inhibited by specifying ' |
+ 'a\nContent-Type header via the -h option.') |
+ elif o == '-z': |
+ gzip_exts = a.split(',') |
+ |
+ self._SetContentTypeHeader(src_uri, headers) |
+ if should_log: |
+ self._LogCopyOperation(src_uri, dst_uri, headers) |
+ |
+ if 'Content-Language' not in headers: |
+ content_language = config.get_value('GSUtil', 'content_language') |
+ if content_language: |
+ headers['Content-Language'] = content_language |
+ |
+ fname_parts = src_uri.object_name.split('.') |
+ if len(fname_parts) > 1 and fname_parts[-1] in gzip_exts: |
+ if self.debug: |
+ print 'Compressing %s (to tmp)...' % src_key |
+ (gzip_fh, gzip_path) = tempfile.mkstemp() |
+ gzip_fp = None |
+ try: |
+ # Check for temp space. Assume the compressed object is at most 2x |
+ # the size of the object (normally should compress to smaller than |
+ # the object) |
+ if (self._CheckFreeSpace(gzip_path) |
+ < 2*int(os.path.getsize(src_key.name))): |
+ raise CommandException('Inadequate temp space available to compress ' |
+ '%s' % src_key.name) |
+ gzip_fp = gzip.open(gzip_path, 'wb') |
+ gzip_fp.writelines(src_key.fp) |
+ finally: |
+ if gzip_fp: |
+ gzip_fp.close() |
+ os.close(gzip_fh) |
+ headers['Content-Encoding'] = 'gzip' |
+ gzip_fp = open(gzip_path, 'rb') |
+ try: |
+ (elapsed_time, bytes_transferred, result_uri) = ( |
+ self._PerformResumableUploadIfApplies(gzip_fp, dst_uri, |
+ canned_acl, headers)) |
+ finally: |
+ gzip_fp.close() |
+ try: |
+ os.unlink(gzip_path) |
+ # Windows sometimes complains the temp file is locked when you try to |
+ # delete it. |
+ except Exception, e: |
+ pass |
+ elif (src_key.is_stream() |
+ and dst_uri.get_provider().supports_chunked_transfer()): |
+ (elapsed_time, bytes_transferred, result_uri) = ( |
+ self._PerformStreamingUpload(src_key.fp, dst_uri, headers, |
+ canned_acl)) |
+ else: |
+ if src_key.is_stream(): |
+ # For Providers that doesn't support chunked Transfers |
+ tmp = tempfile.NamedTemporaryFile() |
+ file_uri = self.suri_builder.StorageUri('file://%s' % tmp.name) |
+ try: |
+ file_uri.new_key(False, headers).set_contents_from_file( |
+ src_key.fp, headers) |
+ src_key = file_uri.get_key() |
+ finally: |
+ file_uri.close() |
+ try: |
+ (elapsed_time, bytes_transferred, result_uri) = ( |
+ self._PerformResumableUploadIfApplies(src_key.fp, dst_uri, |
+ canned_acl, headers)) |
+ finally: |
+ if src_key.is_stream(): |
+ tmp.close() |
+ else: |
+ src_key.close() |
+ |
+ return (elapsed_time, bytes_transferred, result_uri) |
+ |
+ def _DownloadObjectToFile(self, src_key, src_uri, dst_uri, headers, |
+ should_log=True): |
+ """Downloads an object to a local file. |
+ |
+ Args: |
+ src_key: Source StorageUri. Must be a file URI. |
+ src_uri: Source StorageUri. |
+ dst_uri: Destination StorageUri. |
+ headers: The headers dictionary. |
+ should_log: bool indicator whether we should log this operation. |
+ Returns: |
+ (elapsed_time, bytes_transferred, dst_uri), excluding overhead like |
+ initial HEAD. |
+ |
+ Raises: |
+ CommandException: if errors encountered. |
+ """ |
+ if should_log: |
+ self._LogCopyOperation(src_uri, dst_uri, headers) |
+ (cb, num_cb, res_download_handler) = self._GetTransferHandlers( |
+ dst_uri, src_key.size, False) |
+ file_name = dst_uri.object_name |
+ dir_name = os.path.dirname(file_name) |
+ if dir_name and not os.path.exists(dir_name): |
+ # Do dir creation in try block so can ignore case where dir already |
+ # exists. This is needed to avoid a race condition when running gsutil |
+ # -m cp. |
+ try: |
+ os.makedirs(dir_name) |
+ except OSError, e: |
+ if e.errno != errno.EEXIST: |
+ raise |
+ # For gzipped objects not named *.gz download to a temp file and unzip. |
+ if (hasattr(src_key, 'content_encoding') |
+ and src_key.content_encoding == 'gzip' |
+ and not file_name.endswith('.gz')): |
+ # We can't use tempfile.mkstemp() here because we need a predictable |
+ # filename for resumable downloads. |
+ download_file_name = '%s_.gztmp' % file_name |
+ need_to_unzip = True |
+ else: |
+ download_file_name = file_name |
+ need_to_unzip = False |
+ fp = None |
+ try: |
+ if res_download_handler: |
+ fp = open(download_file_name, 'ab') |
+ else: |
+ fp = open(download_file_name, 'wb') |
+ start_time = time.time() |
+ src_key.get_contents_to_file(fp, headers, cb=cb, num_cb=num_cb, |
+ res_download_handler=res_download_handler) |
+ # If a custom test method is defined, call it here. For the copy command, |
+ # test methods are expected to take one argument: an open file pointer, |
+ # and are used to perturb the open file during download to exercise |
+ # download error detection. |
+ if self.test_method: |
+ self.test_method(fp) |
+ end_time = time.time() |
+ finally: |
+ if fp: |
+ fp.close() |
+ |
+ # Discard the md5 if we are resuming a partial download. |
+ if res_download_handler and res_download_handler.download_start_point: |
+ src_key.md5 = None |
+ |
+ # Verify downloaded file checksum matched source object's checksum. |
+ self._CheckFinalMd5(src_key, download_file_name) |
+ |
+ if res_download_handler: |
+ bytes_transferred = ( |
+ src_key.size - res_download_handler.download_start_point) |
+ else: |
+ bytes_transferred = src_key.size |
+ if need_to_unzip: |
+ # Log that we're uncompressing if the file is big enough that |
+ # decompressing would make it look like the transfer "stalled" at the end. |
+ if not self.quiet and bytes_transferred > 10 * 1024 * 1024: |
+ self.THREADED_LOGGER.info('Uncompressing downloaded tmp file to %s...', |
+ file_name) |
+ # Downloaded gzipped file to a filename w/o .gz extension, so unzip. |
+ f_in = gzip.open(download_file_name, 'rb') |
+ f_out = open(file_name, 'wb') |
+ try: |
+ while True: |
+ data = f_in.read(8192) |
+ if not data: |
+ break |
+ f_out.write(data) |
+ finally: |
+ f_out.close() |
+ f_in.close() |
+ os.unlink(download_file_name) |
+ return (end_time - start_time, bytes_transferred, dst_uri) |
+ |
+ def _PerformDownloadToStream(self, src_key, src_uri, str_fp, headers): |
+ (cb, num_cb, res_download_handler) = self._GetTransferHandlers( |
+ src_uri, src_key.size, False) |
+ start_time = time.time() |
+ src_key.get_contents_to_file(str_fp, headers, cb=cb, num_cb=num_cb) |
+ end_time = time.time() |
+ bytes_transferred = src_key.size |
+ end_time = time.time() |
+ return (end_time - start_time, bytes_transferred) |
+ |
+ def _CopyFileToFile(self, src_key, src_uri, dst_uri, headers): |
+ """Copies a local file to a local file. |
+ |
+ Args: |
+ src_key: Source StorageUri. Must be a file URI. |
+ src_uri: Source StorageUri. |
+ dst_uri: Destination StorageUri. |
+ headers: The headers dictionary. |
+ Returns: |
+ (elapsed_time, bytes_transferred, dst_uri), excluding |
+ overhead like initial HEAD. |
+ |
+ Raises: |
+ CommandException: if errors encountered. |
+ """ |
+ self._LogCopyOperation(src_uri, dst_uri, headers) |
+ dst_key = dst_uri.new_key(False, headers) |
+ start_time = time.time() |
+ dst_key.set_contents_from_file(src_key.fp, headers) |
+ end_time = time.time() |
+ return (end_time - start_time, os.path.getsize(src_key.fp.name), dst_uri) |
+ |
+ def _CopyObjToObjDaisyChainMode(self, src_key, src_uri, dst_uri, headers): |
+ """Copies from src_uri to dst_uri in "daisy chain" mode. |
+ See -D OPTION documentation about what daisy chain mode is. |
+ |
+ Args: |
+ src_key: Source Key. |
+ src_uri: Source StorageUri. |
+ dst_uri: Destination StorageUri. |
+ headers: A copy of the headers dictionary. |
+ |
+ Returns: |
+ (elapsed_time, bytes_transferred, version-specific dst_uri) excluding |
+ overhead like initial HEAD. |
+ |
+ Raises: |
+ CommandException: if errors encountered. |
+ """ |
+ self._SetContentTypeHeader(src_uri, headers) |
+ self._LogCopyOperation(src_uri, dst_uri, headers) |
+ canned_acl = None |
+ if self.sub_opts: |
+ for o, a in self.sub_opts: |
+ if o == '-a': |
+ canned_acls = dst_uri.canned_acls() |
+ if a not in canned_acls: |
+ raise CommandException('Invalid canned ACL "%s".' % a) |
+ canned_acl = a |
+ elif o == '-p': |
+ # We don't attempt to preserve ACLs across providers because |
+ # GCS and S3 support different ACLs and disjoint principals. |
+ raise NotImplementedError('Cross-provider cp -p not supported') |
+ return self._PerformResumableUploadIfApplies(KeyFile(src_key), dst_uri, |
+ canned_acl, headers) |
+ |
+ def _PerformCopy(self, src_uri, dst_uri): |
+ """Performs copy from src_uri to dst_uri, handling various special cases. |
+ |
+ Args: |
+ src_uri: Source StorageUri. |
+ dst_uri: Destination StorageUri. |
+ |
+ Returns: |
+ (elapsed_time, bytes_transferred, version-specific dst_uri) excluding |
+ overhead like initial HEAD. |
+ |
+ Raises: |
+ CommandException: if errors encountered. |
+ """ |
+ # Make a copy of the input headers each time so we can set a different |
+ # content type for each object. |
+ if self.headers: |
+ headers = self.headers.copy() |
+ else: |
+ headers = {} |
+ |
+ src_key = src_uri.get_key(False, headers) |
+ if not src_key: |
+ raise CommandException('"%s" does not exist.' % src_uri) |
+ |
+ # On Windows, stdin is opened as text mode instead of binary which causes |
+ # problems when piping a binary file, so this switches it to binary mode. |
+ if IS_WINDOWS and src_uri.is_file_uri() and src_key.is_stream(): |
+ import msvcrt |
+ msvcrt.setmode(src_key.fp.fileno(), os.O_BINARY) |
+ |
+ if self.no_clobber: |
+ # There are two checks to prevent clobbering: |
+ # 1) The first check is to see if the item |
+ # already exists at the destination and prevent the upload/download |
+ # from happening. This is done by the exists() call. |
+ # 2) The second check is only relevant if we are writing to gs. We can |
+ # enforce that the server only writes the object if it doesn't exist |
+ # by specifying the header below. This check only happens at the |
+ # server after the complete file has been uploaded. We specify this |
+ # header to prevent a race condition where a destination file may |
+ # be created after the first check and before the file is fully |
+ # uploaded. |
+ # In order to save on unnecessary uploads/downloads we perform both |
+ # checks. However, this may come at the cost of additional HTTP calls. |
+ if dst_uri.exists(headers): |
+ if not self.quiet: |
+ self.THREADED_LOGGER.info('Skipping existing item: %s' % |
+ dst_uri.uri) |
+ return (0, 0, None) |
+ if dst_uri.is_cloud_uri() and dst_uri.scheme == 'gs': |
+ headers['x-goog-if-generation-match'] = '0' |
+ |
+ if src_uri.is_cloud_uri() and dst_uri.is_cloud_uri(): |
+ if src_uri.scheme == dst_uri.scheme and not self.daisy_chain: |
+ return self._CopyObjToObjInTheCloud(src_key, src_uri, dst_uri, headers) |
+ else: |
+ return self._CopyObjToObjDaisyChainMode(src_key, src_uri, dst_uri, |
+ headers) |
+ elif src_uri.is_file_uri() and dst_uri.is_cloud_uri(): |
+ return self._UploadFileToObject(src_key, src_uri, dst_uri, headers) |
+ elif src_uri.is_cloud_uri() and dst_uri.is_file_uri(): |
+ return self._DownloadObjectToFile(src_key, src_uri, dst_uri, headers) |
+ elif src_uri.is_file_uri() and dst_uri.is_file_uri(): |
+ return self._CopyFileToFile(src_key, src_uri, dst_uri, headers) |
+ else: |
+ raise CommandException('Unexpected src/dest case') |
+ |
+ def _ExpandDstUri(self, dst_uri_str): |
+ """ |
+ Expands wildcard if present in dst_uri_str. |
+ |
+ Args: |
+ dst_uri_str: String representation of requested dst_uri. |
+ |
+ Returns: |
+ (exp_dst_uri, have_existing_dst_container) |
+ where have_existing_dst_container is a bool indicating whether |
+ exp_dst_uri names an existing directory, bucket, or bucket subdirectory. |
+ |
+ Raises: |
+ CommandException: if dst_uri_str matched more than 1 URI. |
+ """ |
+ dst_uri = self.suri_builder.StorageUri(dst_uri_str) |
+ |
+ # Handle wildcarded dst_uri case. |
+ if ContainsWildcard(dst_uri): |
+ blr_expansion = list(self.WildcardIterator(dst_uri)) |
+ if len(blr_expansion) != 1: |
+ raise CommandException('Destination (%s) must match exactly 1 URI' % |
+ dst_uri_str) |
+ blr = blr_expansion[0] |
+ uri = blr.GetUri() |
+ if uri.is_cloud_uri(): |
+ return (uri, uri.names_bucket() or blr.HasPrefix() |
+ or blr.GetKey().endswith('/')) |
+ else: |
+ return (uri, uri.names_directory()) |
+ |
+ # Handle non-wildcarded dst_uri: |
+ if dst_uri.is_file_uri(): |
+ return (dst_uri, dst_uri.names_directory()) |
+ if dst_uri.names_bucket(): |
+ return (dst_uri, True) |
+ # For object URIs check 3 cases: (a) if the name ends with '/' treat as a |
+ # subdir; else, perform a wildcard expansion with dst_uri + "*" and then |
+ # find if (b) there's a Prefix matching dst_uri, or (c) name is of form |
+ # dir_$folder$ (and in both these cases also treat dir as a subdir). |
+ if dst_uri.is_cloud_uri() and dst_uri_str.endswith('/'): |
+ return (dst_uri, True) |
+ blr_expansion = list(self.WildcardIterator( |
+ '%s*' % dst_uri_str.rstrip(dst_uri.delim))) |
+ for blr in blr_expansion: |
+ if blr.GetRStrippedUriString().endswith('_$folder$'): |
+ return (dst_uri, True) |
+ if blr.GetRStrippedUriString() == dst_uri_str.rstrip(dst_uri.delim): |
+ return (dst_uri, blr.HasPrefix()) |
+ return (dst_uri, False) |
+ |
+ def _ConstructDstUri(self, src_uri, exp_src_uri, |
+ src_uri_names_container, src_uri_expands_to_multi, |
+ have_multiple_srcs, exp_dst_uri, |
+ have_existing_dest_subdir): |
+ """ |
+ Constructs the destination URI for a given exp_src_uri/exp_dst_uri pair, |
+ using context-dependent naming rules that mimic Linux cp and mv behavior. |
+ |
+ Args: |
+ src_uri: src_uri to be copied. |
+ exp_src_uri: Single StorageUri from wildcard expansion of src_uri. |
+ src_uri_names_container: True if src_uri names a container (including the |
+ case of a wildcard-named bucket subdir (like gs://bucket/abc, |
+ where gs://bucket/abc/* matched some objects). Note that this is |
+ additional semantics tha src_uri.names_container() doesn't understand |
+ because the latter only understands StorageUris, not wildcards. |
+ src_uri_expands_to_multi: True if src_uri expanded to multiple URIs. |
+ have_multiple_srcs: True if this is a multi-source request. This can be |
+ true if src_uri wildcard-expanded to multiple URIs or if there were |
+ multiple source URIs in the request. |
+ exp_dst_uri: the expanded StorageUri requested for the cp destination. |
+ Final written path is constructed from this plus a context-dependent |
+ variant of src_uri. |
+ have_existing_dest_subdir: bool indicator whether dest is an existing |
+ subdirectory. |
+ |
+ Returns: |
+ StorageUri to use for copy. |
+ |
+ Raises: |
+ CommandException if destination object name not specified for |
+ source and source is a stream. |
+ """ |
+ if self._ShouldTreatDstUriAsSingleton( |
+ have_multiple_srcs, have_existing_dest_subdir, exp_dst_uri): |
+ # We're copying one file or object to one file or object. |
+ return exp_dst_uri |
+ |
+ if exp_src_uri.is_stream(): |
+ if exp_dst_uri.names_container(): |
+ raise CommandException('Destination object name needed when ' |
+ 'source is a stream') |
+ return exp_dst_uri |
+ |
+ if not self.recursion_requested and not have_multiple_srcs: |
+ # We're copying one file or object to a subdirectory. Append final comp |
+ # of exp_src_uri to exp_dst_uri. |
+ src_final_comp = exp_src_uri.object_name.rpartition(src_uri.delim)[-1] |
+ return self.suri_builder.StorageUri('%s%s%s' % ( |
+ exp_dst_uri.uri.rstrip(exp_dst_uri.delim), exp_dst_uri.delim, |
+ src_final_comp)) |
+ |
+ # Else we're copying multiple sources to a directory, bucket, or a bucket |
+ # "sub-directory". |
+ |
+ # Ensure exp_dst_uri ends in delim char if we're doing a multi-src copy or |
+ # a copy to a directory. (The check for copying to a directory needs |
+ # special-case handling so that the command: |
+ # gsutil cp gs://bucket/obj dir |
+ # will turn into file://dir/ instead of file://dir -- the latter would cause |
+ # the file "dirobj" to be created.) |
+ # Note: need to check have_multiple_srcs or src_uri.names_container() |
+ # because src_uri could be a bucket containing a single object, named |
+ # as gs://bucket. |
+ if ((have_multiple_srcs or src_uri.names_container() |
+ or os.path.isdir(exp_dst_uri.object_name)) |
+ and not exp_dst_uri.uri.endswith(exp_dst_uri.delim)): |
+ exp_dst_uri = exp_dst_uri.clone_replace_name( |
+ '%s%s' % (exp_dst_uri.object_name, exp_dst_uri.delim) |
+ ) |
+ |
+ # Making naming behavior match how things work with local Linux cp and mv |
+ # operations depends on many factors, including whether the destination is a |
+ # container, the plurality of the source(s), and whether the mv command is |
+ # being used: |
+ # 1. For the "mv" command that specifies a non-existent destination subdir, |
+ # renaming should occur at the level of the src subdir, vs appending that |
+ # subdir beneath the dst subdir like is done for copying. For example: |
+ # gsutil rm -R gs://bucket |
+ # gsutil cp -R dir1 gs://bucket |
+ # gsutil cp -R dir2 gs://bucket/subdir1 |
+ # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2 |
+ # would (if using cp naming behavior) end up with paths like: |
+ # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops |
+ # whereas mv naming behavior should result in: |
+ # gs://bucket/subdir2/dir2/.svn/all-wcprops |
+ # 2. Copying from directories, buckets, or bucket subdirs should result in |
+ # objects/files mirroring the source directory hierarchy. For example: |
+ # gsutil cp dir1/dir2 gs://bucket |
+ # should create the object gs://bucket/dir2/file2, assuming dir1/dir2 |
+ # contains file2). |
+ # To be consistent with Linux cp behavior, there's one more wrinkle when |
+ # working with subdirs: The resulting object names depend on whether the |
+ # destination subdirectory exists. For example, if gs://bucket/subdir |
+ # exists, the command: |
+ # gsutil cp -R dir1/dir2 gs://bucket/subdir |
+ # should create objects named like gs://bucket/subdir/dir2/a/b/c. In |
+ # contrast, if gs://bucket/subdir does not exist, this same command |
+ # should create objects named like gs://bucket/subdir/a/b/c. |
+ # 3. Copying individual files or objects to dirs, buckets or bucket subdirs |
+ # should result in objects/files named by the final source file name |
+ # component. Example: |
+ # gsutil cp dir1/*.txt gs://bucket |
+ # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt, |
+ # assuming dir1 contains f1.txt and f2.txt. |
+ |
+ if (self.perform_mv and self.recursion_requested |
+ and src_uri_expands_to_multi and not have_existing_dest_subdir): |
+ # Case 1. Handle naming rules for bucket subdir mv. Here we want to |
+ # line up the src_uri against its expansion, to find the base to build |
+ # the new name. For example, running the command: |
+ # gsutil mv gs://bucket/abcd gs://bucket/xyz |
+ # when processing exp_src_uri=gs://bucket/abcd/123 |
+ # exp_src_uri_tail should become /123 |
+ # Note: mv.py code disallows wildcard specification of source URI. |
+ exp_src_uri_tail = exp_src_uri.uri[len(src_uri.uri):] |
+ dst_key_name = '%s/%s' % (exp_dst_uri.object_name.rstrip('/'), |
+ exp_src_uri_tail.strip('/')) |
+ return exp_dst_uri.clone_replace_name(dst_key_name) |
+ |
+ if src_uri_names_container and not exp_dst_uri.names_file(): |
+ # Case 2. Build dst_key_name from subpath of exp_src_uri past |
+ # where src_uri ends. For example, for src_uri=gs://bucket/ and |
+ # exp_src_uri=gs://bucket/src_subdir/obj, dst_key_name should be |
+ # src_subdir/obj. |
+ src_uri_path_sans_final_dir = _GetPathBeforeFinalDir(src_uri) |
+ dst_key_name = exp_src_uri.uri[ |
+ len(src_uri_path_sans_final_dir):].lstrip(src_uri.delim) |
+ # Handle case where dst_uri is a non-existent subdir. |
+ if not have_existing_dest_subdir: |
+ dst_key_name = dst_key_name.partition(src_uri.delim)[-1] |
+ # Handle special case where src_uri was a directory named with '.' or |
+ # './', so that running a command like: |
+ # gsutil cp -r . gs://dest |
+ # will produce obj names of the form gs://dest/abc instead of |
+ # gs://dest/./abc. |
+ if dst_key_name.startswith('.%s' % os.sep): |
+ dst_key_name = dst_key_name[2:] |
+ |
+ else: |
+ # Case 3. |
+ dst_key_name = exp_src_uri.object_name.rpartition(src_uri.delim)[-1] |
+ |
+ if (exp_dst_uri.is_file_uri() |
+ or self._ShouldTreatDstUriAsBucketSubDir( |
+ have_multiple_srcs, exp_dst_uri, have_existing_dest_subdir)): |
+ if exp_dst_uri.object_name.endswith(exp_dst_uri.delim): |
+ dst_key_name = '%s%s%s' % ( |
+ exp_dst_uri.object_name.rstrip(exp_dst_uri.delim), |
+ exp_dst_uri.delim, dst_key_name) |
+ else: |
+ delim = exp_dst_uri.delim if exp_dst_uri.object_name else '' |
+ dst_key_name = '%s%s%s' % (exp_dst_uri.object_name, delim, dst_key_name) |
+ |
+ return exp_dst_uri.clone_replace_name(dst_key_name) |
+ |
+ def _FixWindowsNaming(self, src_uri, dst_uri): |
+ """ |
+ Rewrites the destination URI built by _ConstructDstUri() to translate |
+ Windows pathnames to cloud pathnames if needed. |
+ |
+ Args: |
+ src_uri: Source URI to be copied. |
+ dst_uri: The destination URI built by _ConstructDstUri(). |
+ |
+ Returns: |
+ StorageUri to use for copy. |
+ """ |
+ if (src_uri.is_file_uri() and src_uri.delim == '\\' |
+ and dst_uri.is_cloud_uri()): |
+ trans_uri_str = re.sub(r'\\', '/', dst_uri.uri) |
+ dst_uri = self.suri_builder.StorageUri(trans_uri_str) |
+ return dst_uri |
+ |
+ # Command entry point. |
+ def RunCommand(self): |
+ |
+ # Inner funcs. |
+ def _CopyExceptionHandler(e): |
+ """Simple exception handler to allow post-completion status.""" |
+ self.THREADED_LOGGER.error(str(e)) |
+ self.copy_failure_count += 1 |
+ |
+ def _CopyFunc(name_expansion_result): |
+ """Worker function for performing the actual copy (and rm, for mv).""" |
+ if self.perform_mv: |
+ cmd_name = 'mv' |
+ else: |
+ cmd_name = self.command_name |
+ src_uri = self.suri_builder.StorageUri( |
+ name_expansion_result.GetSrcUriStr()) |
+ exp_src_uri = self.suri_builder.StorageUri( |
+ name_expansion_result.GetExpandedUriStr()) |
+ src_uri_names_container = name_expansion_result.NamesContainer() |
+ src_uri_expands_to_multi = name_expansion_result.NamesContainer() |
+ have_multiple_srcs = name_expansion_result.IsMultiSrcRequest() |
+ have_existing_dest_subdir = ( |
+ name_expansion_result.HaveExistingDstContainer()) |
+ |
+ if src_uri.names_provider(): |
+ raise CommandException( |
+ 'The %s command does not allow provider-only source URIs (%s)' % |
+ (cmd_name, src_uri)) |
+ if have_multiple_srcs: |
+ self._InsistDstUriNamesContainer(exp_dst_uri, |
+ have_existing_dst_container, |
+ cmd_name) |
+ |
+ if self.perform_mv: |
+ if name_expansion_result.NamesContainer(): |
+ # Use recursion_requested when performing name expansion for the |
+ # directory mv case so we can determine if any of the source URIs are |
+ # directories (and then use cp -R and rm -R to perform the move, to |
+ # match the behavior of Linux mv (which when moving a directory moves |
+ # all the contained files). |
+ self.recursion_requested = True |
+ # Disallow wildcard src URIs when moving directories, as supporting it |
+ # would make the name transformation too complex and would also be |
+ # dangerous (e.g., someone could accidentally move many objects to the |
+ # wrong name, or accidentally overwrite many objects). |
+ if ContainsWildcard(src_uri): |
+ raise CommandException('The mv command disallows naming source ' |
+ 'directories using wildcards') |
+ |
+ if (exp_dst_uri.is_file_uri() |
+ and not os.path.exists(exp_dst_uri.object_name) |
+ and have_multiple_srcs): |
+ os.makedirs(exp_dst_uri.object_name) |
+ |
+ dst_uri = self._ConstructDstUri(src_uri, exp_src_uri, |
+ src_uri_names_container, |
+ src_uri_expands_to_multi, |
+ have_multiple_srcs, exp_dst_uri, |
+ have_existing_dest_subdir) |
+ dst_uri = self._FixWindowsNaming(src_uri, dst_uri) |
+ |
+ self._CheckForDirFileConflict(exp_src_uri, dst_uri) |
+ if self._SrcDstSame(exp_src_uri, dst_uri): |
+ raise CommandException('%s: "%s" and "%s" are the same file - ' |
+ 'abort.' % (cmd_name, exp_src_uri, dst_uri)) |
+ |
+ if dst_uri.is_cloud_uri() and dst_uri.is_version_specific: |
+ raise CommandException('%s: a version-specific URI\n(%s)\ncannot be ' |
+ 'the destination for gsutil cp - abort.' |
+ % (cmd_name, dst_uri)) |
+ |
+ elapsed_time = bytes_transferred = 0 |
+ try: |
+ (elapsed_time, bytes_transferred, result_uri) = ( |
+ self._PerformCopy(exp_src_uri, dst_uri)) |
+ except Exception, e: |
+ if self._IsNoClobberServerException(e): |
+ if not self.quiet: |
+ self.THREADED_LOGGER.info('Rejected (noclobber): %s' % dst_uri.uri) |
+ elif self.continue_on_error: |
+ if not self.quiet: |
+ self.THREADED_LOGGER.error('Error copying %s: %s' % (src_uri.uri, |
+ str(e))) |
+ self.copy_failure_count += 1 |
+ else: |
+ raise |
+ if self.print_ver: |
+ # Some cases don't return a version-specific URI (e.g., if destination |
+ # is a file). |
+ if hasattr(result_uri, 'version_specific_uri'): |
+ self.THREADED_LOGGER.info('Created: %s' % |
+ result_uri.version_specific_uri) |
+ else: |
+ self.THREADED_LOGGER.info('Created: %s' % result_uri.uri) |
+ |
+ # TODO: If we ever use -n (noclobber) with -M (move) (not possible today |
+ # since we call copy internally from move and don't specify the -n flag) |
+ # we'll need to only remove the source when we have not skipped the |
+ # destination. |
+ if self.perform_mv: |
+ if not self.quiet: |
+ self.THREADED_LOGGER.info('Removing %s...', exp_src_uri) |
+ exp_src_uri.delete_key(validate=False, headers=self.headers) |
+ stats_lock.acquire() |
+ self.total_elapsed_time += elapsed_time |
+ self.total_bytes_transferred += bytes_transferred |
+ stats_lock.release() |
+ |
+ # Start of RunCommand code. |
+ self._ParseArgs() |
+ |
+ self.total_elapsed_time = self.total_bytes_transferred = 0 |
+ if self.args[-1] == '-' or self.args[-1] == 'file://-': |
+ self._HandleStreamingDownload() |
+ return 0 |
+ |
+ if self.read_args_from_stdin: |
+ if len(self.args) != 1: |
+ raise CommandException('Source URIs cannot be specified with -I option') |
+ uri_strs = self._StdinIterator() |
+ else: |
+ if len(self.args) < 2: |
+ raise CommandException('Wrong number of arguments for "cp" command.') |
+ uri_strs = self.args[0:len(self.args)-1] |
+ |
+ (exp_dst_uri, have_existing_dst_container) = self._ExpandDstUri( |
+ self.args[-1]) |
+ name_expansion_iterator = NameExpansionIterator( |
+ self.command_name, self.proj_id_handler, self.headers, self.debug, |
+ self.bucket_storage_uri_class, uri_strs, |
+ self.recursion_requested or self.perform_mv, |
+ have_existing_dst_container) |
+ |
+ # Use a lock to ensure accurate statistics in the face of |
+ # multi-threading/multi-processing. |
+ stats_lock = threading.Lock() |
+ |
+ # Tracks if any copies failed. |
+ self.copy_failure_count = 0 |
+ |
+ # Start the clock. |
+ start_time = time.time() |
+ |
+ # Tuple of attributes to share/manage across multiple processes in |
+ # parallel (-m) mode. |
+ shared_attrs = ('copy_failure_count', 'total_bytes_transferred') |
+ |
+ # Perform copy requests in parallel (-m) mode, if requested, using |
+ # configured number of parallel processes and threads. Otherwise, |
+ # perform requests with sequential function calls in current process. |
+ self.Apply(_CopyFunc, name_expansion_iterator, _CopyExceptionHandler, |
+ shared_attrs) |
+ if self.debug: |
+ print 'total_bytes_transferred:' + str(self.total_bytes_transferred) |
+ |
+ end_time = time.time() |
+ self.total_elapsed_time = end_time - start_time |
+ |
+ # Sometimes, particularly when running unit tests, the total elapsed time |
+ # is really small. On Windows, the timer resolution is too small and |
+ # causes total_elapsed_time to be zero. |
+ try: |
+ float(self.total_bytes_transferred) / float(self.total_elapsed_time) |
+ except ZeroDivisionError: |
+ self.total_elapsed_time = 0.01 |
+ |
+ self.total_bytes_per_second = (float(self.total_bytes_transferred) / |
+ float(self.total_elapsed_time)) |
+ |
+ if self.debug == 3: |
+ # Note that this only counts the actual GET and PUT bytes for the copy |
+ # - not any transfers for doing wildcard expansion, the initial HEAD |
+ # request boto performs when doing a bucket.get_key() operation, etc. |
+ if self.total_bytes_transferred != 0: |
+ self.THREADED_LOGGER.info( |
+ 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)', |
+ self.total_bytes_transferred, self.total_elapsed_time, |
+ MakeHumanReadable(self.total_bytes_per_second)) |
+ if self.copy_failure_count: |
+ plural_str = '' |
+ if self.copy_failure_count > 1: |
+ plural_str = 's' |
+ raise CommandException('%d file%s/object%s could not be transferred.' % ( |
+ self.copy_failure_count, plural_str, plural_str)) |
+ |
+ return 0 |
+ |
+ def _ParseArgs(self): |
+ self.perform_mv = False |
+ self.exclude_symlinks = False |
+ self.quiet = False |
+ self.no_clobber = False |
+ self.continue_on_error = False |
+ self.daisy_chain = False |
+ self.read_args_from_stdin = False |
+ self.print_ver = False |
+ # self.recursion_requested initialized in command.py (so can be checked |
+ # in parent class for all commands). |
+ if self.sub_opts: |
+ for o, unused_a in self.sub_opts: |
+ if o == '-c': |
+ self.continue_on_error = True |
+ elif o == '-D': |
+ self.daisy_chain = True |
+ elif o == '-e': |
+ self.exclude_symlinks = True |
+ elif o == '-I': |
+ self.read_args_from_stdin = True |
+ elif o == '-M': |
+ # Note that we signal to the cp command to perform a move (copy |
+ # followed by remove) and use directory-move naming rules by passing |
+ # the undocumented (for internal use) -M option when running the cp |
+ # command from mv.py. |
+ self.perform_mv = True |
+ elif o == '-n': |
+ self.no_clobber = True |
+ elif o == '-q': |
+ self.quiet = True |
+ elif o == '-r' or o == '-R': |
+ self.recursion_requested = True |
+ elif o == '-v': |
+ self.print_ver = True |
+ |
+ def _HandleStreamingDownload(self): |
+ # Destination is <STDOUT>. Manipulate sys.stdout so as to redirect all |
+ # debug messages to <STDERR>. |
+ stdout_fp = sys.stdout |
+ sys.stdout = sys.stderr |
+ did_some_work = False |
+ for uri_str in self.args[0:len(self.args)-1]: |
+ for uri in self.WildcardIterator(uri_str).IterUris(): |
+ did_some_work = True |
+ key = uri.get_key(False, self.headers) |
+ (elapsed_time, bytes_transferred) = self._PerformDownloadToStream( |
+ key, uri, stdout_fp, self.headers) |
+ self.total_elapsed_time += elapsed_time |
+ self.total_bytes_transferred += bytes_transferred |
+ if not did_some_work: |
+ raise CommandException('No URIs matched') |
+ if self.debug == 3: |
+ if self.total_bytes_transferred != 0: |
+ self.THREADED_LOGGER.info( |
+ 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)', |
+ self.total_bytes_transferred, self.total_elapsed_time, |
+ MakeHumanReadable(float(self.total_bytes_transferred) / |
+ float(self.total_elapsed_time))) |
+ |
+ def _StdinIterator(self): |
+ """A generator function that returns lines from stdin.""" |
+ for line in sys.stdin: |
+ # Strip CRLF. |
+ yield line.rstrip() |
+ |
+ def _SrcDstSame(self, src_uri, dst_uri): |
+ """Checks if src_uri and dst_uri represent the same object or file. |
+ |
+ We don't handle anything about hard or symbolic links. |
+ |
+ Args: |
+ src_uri: Source StorageUri. |
+ dst_uri: Destination StorageUri. |
+ |
+ Returns: |
+ Bool indicator. |
+ """ |
+ if src_uri.is_file_uri() and dst_uri.is_file_uri(): |
+ # Translate a/b/./c to a/b/c, so src=dst comparison below works. |
+ new_src_path = os.path.normpath(src_uri.object_name) |
+ new_dst_path = os.path.normpath(dst_uri.object_name) |
+ return (src_uri.clone_replace_name(new_src_path).uri == |
+ dst_uri.clone_replace_name(new_dst_path).uri) |
+ else: |
+ return (src_uri.uri == dst_uri.uri and |
+ src_uri.generation == dst_uri.generation and |
+ src_uri.version_id == dst_uri.version_id) |
+ |
+ def _ShouldTreatDstUriAsBucketSubDir(self, have_multiple_srcs, dst_uri, |
+ have_existing_dest_subdir): |
+ """ |
+ Checks whether dst_uri should be treated as a bucket "sub-directory". The |
+ decision about whether something constitutes a bucket "sub-directory" |
+ depends on whether there are multiple sources in this request and whether |
+ there is an existing bucket subdirectory. For example, when running the |
+ command: |
+ gsutil cp file gs://bucket/abc |
+ if there's no existing gs://bucket/abc bucket subdirectory we should copy |
+ file to the object gs://bucket/abc. In contrast, if |
+ there's an existing gs://bucket/abc bucket subdirectory we should copy |
+ file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc |
+ exists, when running the command: |
+ gsutil cp file1 file2 gs://bucket/abc |
+ we should copy file1 to gs://bucket/abc/file1 (and similarly for file2). |
+ |
+ Note that we don't disallow naming a bucket "sub-directory" where there's |
+ already an object at that URI. For example it's legitimate (albeit |
+ confusing) to have an object called gs://bucket/dir and |
+ then run the command |
+ gsutil cp file1 file2 gs://bucket/dir |
+ Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1, |
+ and gs://bucket/dir/file2. |
+ |
+ Args: |
+ have_multiple_srcs: Bool indicator of whether this is a multi-source |
+ operation. |
+ dst_uri: StorageUri to check. |
+ have_existing_dest_subdir: bool indicator whether dest is an existing |
+ subdirectory. |
+ |
+ Returns: |
+ bool indicator. |
+ """ |
+ return ((have_multiple_srcs and dst_uri.is_cloud_uri()) |
+ or (have_existing_dest_subdir)) |
+ |
+ def _ShouldTreatDstUriAsSingleton(self, have_multiple_srcs, |
+ have_existing_dest_subdir, dst_uri): |
+ """ |
+ Checks that dst_uri names a singleton (file or object) after |
+ dir/wildcard expansion. The decision is more nuanced than simply |
+ dst_uri.names_singleton()) because of the possibility that an object path |
+ might name a bucket sub-directory. |
+ |
+ Args: |
+ have_multiple_srcs: Bool indicator of whether this is a multi-source |
+ operation. |
+ have_existing_dest_subdir: bool indicator whether dest is an existing |
+ subdirectory. |
+ dst_uri: StorageUri to check. |
+ |
+ Returns: |
+ bool indicator. |
+ """ |
+ if have_multiple_srcs: |
+ # Only a file meets the criteria in this case. |
+ return dst_uri.names_file() |
+ return not have_existing_dest_subdir and dst_uri.names_singleton() |
+ |
+ def _IsNoClobberServerException(self, e): |
+ """ |
+ Checks to see if the server attempted to clobber a file after we specified |
+ in the header that we didn't want the file clobbered. |
+ |
+ Args: |
+ e: The Exception that was generated by a failed copy operation |
+ |
+ Returns: |
+ bool indicator - True indicates that the server did attempt to clobber |
+ an existing file. |
+ """ |
+ return self.no_clobber and ( |
+ (isinstance(e, GSResponseError) and e.status==412) or |
+ (isinstance(e, ResumableUploadException) and 'code 412' in e.message)) |
+ |
+def _GetPathBeforeFinalDir(uri): |
+ """ |
+ Returns the part of the path before the final directory component for the |
+ given URI, handling cases for file system directories, bucket, and bucket |
+ subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket', |
+ and for file://dir we'll return file:// |
+ |
+ Args: |
+ uri: StorageUri. |
+ |
+ Returns: |
+ String name of above-described path, sans final path separator. |
+ """ |
+ sep = uri.delim |
+ assert not uri.names_file() |
+ if uri.names_directory(): |
+ past_scheme = uri.uri[len('file://'):] |
+ if past_scheme.find(sep) == -1: |
+ return 'file://' |
+ else: |
+ return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] |
+ if uri.names_bucket(): |
+ return '%s://' % uri.scheme |
+ # Else it names a bucket subdir. |
+ return uri.uri.rstrip(sep).rpartition(sep)[0] |
+ |
+def _hash_filename(filename): |
+ """ |
+ Apply a hash function (SHA1) to shorten the passed file name. The spec |
+ for the hashed file name is as follows: |
+ |
+ TRACKER_<hash>_<trailing> |
+ |
+ where hash is a SHA1 hash on the original file name and trailing is |
+ the last 16 chars from the original file name. Max file name lengths |
+ vary by operating system so the goal of this function is to ensure |
+ the hashed version takes fewer than 100 characters. |
+ |
+ Args: |
+ filename: file name to be hashed. |
+ |
+ Returns: |
+ shorter, hashed version of passed file name |
+ """ |
+ if not isinstance(filename, unicode): |
+ filename = unicode(filename, 'utf8').encode('utf-8') |
+ m = hashlib.sha1(filename) |
+ return "TRACKER_" + m.hexdigest() + '.' + filename[-16:] |