Index: third_party/gsutil/gslib/commands/rsync.py |
diff --git a/third_party/gsutil/gslib/commands/rsync.py b/third_party/gsutil/gslib/commands/rsync.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..4eb9b92e92ad9eb1155393634fc999e8f3cbb7e0 |
--- /dev/null |
+++ b/third_party/gsutil/gslib/commands/rsync.py |
@@ -0,0 +1,1032 @@ |
+# -*- coding: utf-8 -*- |
+# Copyright 2014 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+"""Implementation of Unix-like rsync command.""" |
+ |
+from __future__ import absolute_import |
+ |
+import errno |
+import heapq |
+import io |
+from itertools import islice |
+import os |
+import re |
+import tempfile |
+import textwrap |
+import traceback |
+import urllib |
+ |
+from boto import config |
+import crcmod |
+ |
+from gslib import copy_helper |
+from gslib.cloud_api import NotFoundException |
+from gslib.command import Command |
+from gslib.command import DummyArgChecker |
+from gslib.command_argument import CommandArgument |
+from gslib.copy_helper import CreateCopyHelperOpts |
+from gslib.copy_helper import SkipUnsupportedObjectError |
+from gslib.cs_api_map import ApiSelector |
+from gslib.exception import CommandException |
+from gslib.hashing_helper import CalculateB64EncodedCrc32cFromContents |
+from gslib.hashing_helper import CalculateB64EncodedMd5FromContents |
+from gslib.hashing_helper import SLOW_CRCMOD_WARNING |
+from gslib.plurality_checkable_iterator import PluralityCheckableIterator |
+from gslib.sig_handling import GetCaughtSignals |
+from gslib.sig_handling import RegisterSignalHandler |
+from gslib.storage_url import StorageUrlFromString |
+from gslib.util import GetCloudApiInstance |
+from gslib.util import IsCloudSubdirPlaceholder |
+from gslib.util import TEN_MIB |
+from gslib.util import UsingCrcmodExtension |
+from gslib.util import UTF8 |
+from gslib.wildcard_iterator import CreateWildcardIterator |
+ |
+ |
+_SYNOPSIS = """ |
+ gsutil rsync [-c] [-C] [-d] [-e] [-n] [-p] [-r] [-U] [-x] src_url dst_url |
+""" |
+ |
+_DETAILED_HELP_TEXT = (""" |
+<B>SYNOPSIS</B> |
+""" + _SYNOPSIS + """ |
+ |
+ |
+<B>DESCRIPTION</B> |
+ The gsutil rsync command makes the contents under dst_url the same as the |
+ contents under src_url, by copying any missing files/objects, and (if the |
+ -d option is specified) deleting any extra files/objects. For example, to |
+ make gs://mybucket/data match the contents of the local directory "data" |
+ you could do: |
+ |
+ gsutil rsync -d data gs://mybucket/data |
+ |
+ To recurse into directories use the -r option: |
+ |
+ gsutil rsync -d -r data gs://mybucket/data |
+ |
+ To copy only new/changed files without deleting extra files from |
+ gs://mybucket/data leave off the -d option: |
+ |
+ gsutil rsync -r data gs://mybucket/data |
+ |
+ If you have a large number of objects to synchronize you might want to use the |
+ gsutil -m option, to perform parallel (multi-threaded/multi-processing) |
+ synchronization: |
+ |
+ gsutil -m rsync -d -r data gs://mybucket/data |
+ |
+ The -m option typically will provide a large performance boost if either the |
+ source or destination (or both) is a cloud URL. If both source and |
+ destination are file URLs the -m option will typically thrash the disk and |
+ slow synchronization down. |
+ |
+ To make the local directory "data" the same as the contents of |
+ gs://mybucket/data: |
+ |
+ gsutil rsync -d -r gs://mybucket/data data |
+ |
+ To make the contents of gs://mybucket2 the same as gs://mybucket1: |
+ |
+ gsutil rsync -d -r gs://mybucket1 gs://mybucket2 |
+ |
+ You can also mirror data across local directories: |
+ |
+ gsutil rsync -d -r dir1 dir2 |
+ |
+ To mirror your content across clouds: |
+ |
+ gsutil rsync -d -r gs://my-gs-bucket s3://my-s3-bucket |
+ |
+ Note: If you are synchronizing a large amount of data between clouds you might |
+ consider setting up a |
+ `Google Compute Engine <https://cloud.google.com/products/compute-engine>`_ |
+ account and running gsutil there. Since cross-provider gsutil data transfers |
+ flow through the machine where gsutil is running, doing this can make your |
+ transfer run significantly faster than running gsutil on your local |
+ workstation. |
+ |
+ |
+<B>BE CAREFUL WHEN USING -d OPTION!</B> |
+ The rsync -d option is very useful and commonly used, because it provides a |
+ means of making the contents of a destination bucket or directory match those |
+ of a source bucket or directory. However, please exercise caution when you |
+ use this option: It's possible to delete large amounts of data accidentally |
+ if, for example, you erroneously reverse source and destination. For example, |
+ if you meant to synchronize a local directory from a bucket in the cloud but |
+ instead run the command: |
+ |
+ gsutil -m rsync -r -d ./your-dir gs://your-bucket |
+ |
+ and your-dir is currently empty, you will quickly delete all of the objects in |
+ gs://your-bucket. |
+ |
+ You can also cause large amounts of data to be lost quickly by specifying a |
+ subdirectory of the destination as the source of an rsync. For example, the |
+ command: |
+ |
+ gsutil -m rsync -r -d gs://your-bucket/data gs://your-bucket |
+ |
+ would cause most or all of the objects in gs://your-bucket to be deleted |
+ (some objects may survive if there are any with names that sort lower than |
+ "data" under gs://your-bucket/data). |
+ |
+ In addition to paying careful attention to the source and destination you |
+ specify with the rsync command, there are two more safety measures your can |
+ take when using gsutil rsync -d: |
+ |
+ 1. Try running the command with the rsync -n option first, to see what it |
+ would do without actually performing the operations. For example, if |
+ you run the command: |
+ |
+ gsutil -m rsync -r -d -n gs://your-bucket/data gs://your-bucket |
+ |
+ it will be immediately evident that running that command without the -n |
+ option would cause many objects to be deleted. |
+ |
+ 2. Enable object versioning in your bucket, which will allow you to restore |
+ objects if you accidentally delete them. For more details see |
+ "gsutil help versions". |
+ |
+ |
+<B>IMPACT OF BUCKET LISTING EVENTUAL CONSISTENCY</B> |
+ The rsync command operates by listing the source and destination URLs, and |
+ then performing copy and remove operations according to the differences |
+ between these listings. Because bucket listing is eventually (not strongly) |
+ consistent, if you upload new objects or delete objects from a bucket and then |
+ immediately run gsutil rsync with that bucket as the source or destination, |
+ it's possible the rsync command will not see the recent updates and thus |
+ synchronize incorrectly. You can rerun the rsync operation again later to |
+ correct the incorrect synchronization. |
+ |
+ |
+<B>CHECKSUM VALIDATION AND FAILURE HANDLING</B> |
+ At the end of every upload or download, the gsutil rsync command validates |
+ that the checksum of the source file/object matches the checksum of the |
+ destination file/object. If the checksums do not match, gsutil will delete |
+ the invalid copy and print a warning message. This very rarely happens, but |
+ if it does, please contact gs-team@google.com. |
+ |
+ The rsync command will retry when failures occur, but if enough failures |
+ happen during a particular copy or delete operation the command will skip that |
+ object and move on. At the end of the synchronization run if any failures were |
+ not successfully retried, the rsync command will report the count of failures, |
+ and exit with non-zero status. At this point you can run the rsync command |
+ again, and it will attempt any remaining needed copy and/or delete operations. |
+ |
+ Note that there are cases where retrying will never succeed, such as if you |
+ don't have write permission to the destination bucket or if the destination |
+ path for some objects is longer than the maximum allowed length. |
+ |
+ For more details about gsutil's retry handling, please see |
+ "gsutil help retries". |
+ |
+ |
+<B>CHANGE DETECTION ALGORITHM</B> |
+ To determine if a file or object has changed gsutil rsync first checks whether |
+ the source and destination sizes match. If they match, it next checks if their |
+ checksums match, using checksums if available (see below). Unlike the Unix |
+ rsync command, gsutil rsync does not use timestamps to determine if the |
+ file/object changed, because the GCS API does not permit the caller to set an |
+ object's timestamp (hence, timestamps of identical files/objects cannot be |
+ made to match). |
+ |
+ Checksums will not be available in two cases: |
+ |
+ 1. When synchronizing to or from a file system. By default, gsutil does not |
+ checksum files, because of the slowdown caused when working with large |
+ files. You can cause gsutil to checksum files by using the gsutil rsync -c |
+ option, at the cost of increased local disk I/O and run time when working |
+ with large files. You should consider using the -c option if your files can |
+ change without changing sizes (e.g., if you have files that contain fixed |
+ width data, such as timestamps). |
+ |
+ 2. When comparing composite GCS objects with objects at a cloud provider that |
+ does not support CRC32C (which is the only checksum available for composite |
+ objects). See 'gsutil help compose' for details about composite objects. |
+ |
+ |
+<B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B> |
+ If both the source and destination URL are cloud URLs from the same provider, |
+ gsutil copies data "in the cloud" (i.e., without downloading to and uploading |
+ from the machine where you run gsutil). In addition to the performance and |
+ cost advantages of doing this, copying in the cloud preserves metadata (like |
+ Content-Type and Cache-Control). In contrast, when you download data from the |
+ cloud it ends up in a file, which has no associated metadata. Thus, unless you |
+ have some way to hold on to or re-create that metadata, synchronizing a bucket |
+ to a directory in the local file system will not retain the metadata. |
+ |
+ Note that by default, the gsutil rsync command does not copy the ACLs of |
+ objects being synchronized and instead will use the default bucket ACL (see |
+ "gsutil help defacl"). You can override this behavior with the -p option (see |
+ OPTIONS below). |
+ |
+ |
+<B>SLOW CHECKSUMS</B> |
+ If you find that CRC32C checksum computation runs slowly, this is likely |
+ because you don't have a compiled CRC32c on your system. Try running: |
+ |
+ gsutil ver -l |
+ |
+ If the output contains: |
+ |
+ compiled crcmod: False |
+ |
+ you are running a Python library for computing CRC32C, which is much slower |
+ than using the compiled code. For information on getting a compiled CRC32C |
+ implementation, see 'gsutil help crc32c'. |
+ |
+ |
+<B>LIMITATIONS</B> |
+ 1. The gsutil rsync command doesn't make the destination object's timestamps |
+ match those of the source object (it can't; timestamp setting is not |
+ allowed by the GCS API). |
+ |
+ 2. The gsutil rsync command ignores versioning, synchronizing only the live |
+ object versions in versioned buckets. |
+ |
+ |
+<B>OPTIONS</B> |
+ -c Causes the rsync command to compute checksums for files if the |
+ size of source and destination match, and then compare |
+ checksums. This option increases local disk I/O and run time |
+ if either src_url or dst_url are on the local file system. |
+ |
+ -C If an error occurs, continue to attempt to copy the remaining |
+ files. If errors occurred, gsutil's exit status will be non-zero |
+ even if this flag is set. This option is implicitly set when |
+ running "gsutil -m rsync...". Note: -C only applies to the |
+ actual copying operation. If an error occurs while iterating |
+ over the files in the local directory (e.g., invalid Unicode |
+ file name) gsutil will print an error message and abort. |
+ |
+ -d Delete extra files under dst_url not found under src_url. By |
+ default extra files are not deleted. Note: this option can |
+ delete data quickly if you specify the wrong source/destination |
+ combination. See the help section above, |
+ "BE CAREFUL WHEN USING -d OPTION!". |
+ |
+ -e Exclude symlinks. When specified, symbolic links will be |
+ ignored. |
+ |
+ -n Causes rsync to run in "dry run" mode, i.e., just outputting |
+ what would be copied or deleted without actually doing any |
+ copying/deleting. |
+ |
+ -p Causes ACLs to be preserved when synchronizing in the cloud. |
+ Note that this option has performance and cost implications when |
+ using the XML API, as it requires separate HTTP calls for |
+ interacting with ACLs. The performance issue can be mitigated to |
+ some degree by using gsutil -m rsync to cause parallel |
+ synchronization. Also, this option only works if you have OWNER |
+ access to all of the objects that are copied. |
+ |
+ You can avoid the additional performance and cost of using |
+ rsync -p if you want all objects in the destination bucket to |
+ end up with the same ACL by setting a default object ACL on that |
+ bucket instead of using rsync -p. See 'help gsutil defacl'. |
+ |
+ -R, -r Causes directories, buckets, and bucket subdirectories to be |
+ synchronized recursively. If you neglect to use this option |
+ gsutil will make only the top-level directory in the source |
+ and destination URLs match, skipping any sub-directories. |
+ |
+ -U Skip objects with unsupported object types instead of failing. |
+ Unsupported object types are s3 glacier objects. |
+ |
+ -x pattern Causes files/objects matching pattern to be excluded, i.e., any |
+ matching files/objects will not be copied or deleted. Note that |
+ the pattern is a Python regular expression, not a wildcard (so, |
+ matching any string ending in 'abc' would be specified using |
+ '.*abc' rather than '*abc'). Note also that the exclude path is |
+ always relative (similar to Unix rsync or tar exclude options). |
+ For example, if you run the command: |
+ |
+ gsutil rsync -x 'data./.*\\.txt' dir gs://my-bucket |
+ |
+ it will skip the file dir/data1/a.txt. |
+ |
+ You can use regex alternation to specify multiple exclusions, |
+ for example: |
+ |
+ gsutil rsync -x '.*\\.txt|.*\\.jpg' dir gs://my-bucket |
+""") |
+ |
+ |
+class _DiffAction(object): |
+ COPY = 'copy' |
+ REMOVE = 'remove' |
+ |
+ |
+_NA = '-' |
+_OUTPUT_BUFFER_SIZE = 64 * 1024 |
+_PROGRESS_REPORT_LISTING_COUNT = 10000 |
+ |
+ |
+# Tracks files we need to clean up at end or if interrupted. |
+_tmp_files = [] |
+ |
+ |
+# pylint: disable=unused-argument |
+def _HandleSignals(signal_num, cur_stack_frame): |
+ """Called when rsync command is killed with SIGINT, SIGQUIT or SIGTERM.""" |
+ CleanUpTempFiles() |
+ |
+ |
+def CleanUpTempFiles(): |
+ """Cleans up temp files. |
+ |
+ This function allows the main (RunCommand) function to clean up at end of |
+ operation, or if gsutil rsync is interrupted (e.g., via ^C). This is necessary |
+ because tempfile.NamedTemporaryFile doesn't allow the created file to be |
+ re-opened in read mode on Windows, so we have to use tempfile.mkstemp, which |
+ doesn't automatically delete temp files. |
+ """ |
+ try: |
+ for fname in _tmp_files: |
+ os.unlink(fname) |
+ except: # pylint: disable=bare-except |
+ pass |
+ |
+ |
+class _DiffToApply(object): |
+ """Class that encapsulates info needed to apply diff for one object.""" |
+ |
+ def __init__(self, src_url_str, dst_url_str, diff_action): |
+ """Constructor. |
+ |
+ Args: |
+ src_url_str: The source URL string, or None if diff_action is REMOVE. |
+ dst_url_str: The destination URL string. |
+ diff_action: _DiffAction to be applied. |
+ """ |
+ self.src_url_str = src_url_str |
+ self.dst_url_str = dst_url_str |
+ self.diff_action = diff_action |
+ |
+ |
+def _DiffToApplyArgChecker(command_instance, diff_to_apply): |
+ """Arg checker that skips symlinks if -e flag specified.""" |
+ if (diff_to_apply.diff_action == _DiffAction.REMOVE |
+ or not command_instance.exclude_symlinks): |
+ # No src URL is populated for REMOVE actions. |
+ return True |
+ exp_src_url = StorageUrlFromString(diff_to_apply.src_url_str) |
+ if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name): |
+ command_instance.logger.info('Skipping symbolic link %s...', exp_src_url) |
+ return False |
+ return True |
+ |
+ |
+def _ComputeNeededFileChecksums(logger, src_url_str, src_size, src_crc32c, |
+ src_md5, dst_url_str, dst_size, dst_crc32c, |
+ dst_md5): |
+ """Computes any file checksums needed by _ObjectsMatch. |
+ |
+ Args: |
+ logger: logging.logger for outputting log messages. |
+ src_url_str: Source URL string. |
+ src_size: Source size |
+ src_crc32c: Source CRC32c. |
+ src_md5: Source MD5. |
+ dst_url_str: Destination URL string. |
+ dst_size: Destination size |
+ dst_crc32c: Destination CRC32c. |
+ dst_md5: Destination MD5. |
+ |
+ Returns: |
+ (src_crc32c, src_md5, dst_crc32c, dst_md5) |
+ """ |
+ src_url = StorageUrlFromString(src_url_str) |
+ dst_url = StorageUrlFromString(dst_url_str) |
+ if src_url.IsFileUrl(): |
+ if dst_crc32c != _NA or dst_url.IsFileUrl(): |
+ if src_size > TEN_MIB: |
+ logger.info('Computing MD5 for %s...', src_url_str) |
+ with open(src_url.object_name, 'rb') as fp: |
+ src_crc32c = CalculateB64EncodedCrc32cFromContents(fp) |
+ elif dst_md5 != _NA or dst_url.IsFileUrl(): |
+ if dst_size > TEN_MIB: |
+ logger.info('Computing MD5 for %s...', dst_url_str) |
+ with open(src_url.object_name, 'rb') as fp: |
+ src_md5 = CalculateB64EncodedMd5FromContents(fp) |
+ if dst_url.IsFileUrl(): |
+ if src_crc32c != _NA: |
+ if src_size > TEN_MIB: |
+ logger.info('Computing CRC32C for %s...', src_url_str) |
+ with open(dst_url.object_name, 'rb') as fp: |
+ dst_crc32c = CalculateB64EncodedCrc32cFromContents(fp) |
+ elif src_md5 != _NA: |
+ if dst_size > TEN_MIB: |
+ logger.info('Computing CRC32C for %s...', dst_url_str) |
+ with open(dst_url.object_name, 'rb') as fp: |
+ dst_md5 = CalculateB64EncodedMd5FromContents(fp) |
+ return (src_crc32c, src_md5, dst_crc32c, dst_md5) |
+ |
+ |
+def _ListUrlRootFunc(cls, args_tuple, thread_state=None): |
+ """Worker function for listing files/objects under to be sync'd. |
+ |
+ Outputs sorted list to out_file_name, formatted per _BuildTmpOutputLine. We |
+ sort the listed URLs because we don't want to depend on consistent sort |
+ order across file systems and cloud providers. |
+ |
+ Args: |
+ cls: Command instance. |
+ args_tuple: (base_url_str, out_file_name, desc), where base_url_str is |
+ top-level URL string to list; out_filename is name of file to |
+ which sorted output should be written; desc is 'source' or |
+ 'destination'. |
+ thread_state: gsutil Cloud API instance to use. |
+ """ |
+ gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) |
+ (base_url_str, out_filename, desc) = args_tuple |
+ # We sort while iterating over base_url_str, allowing parallelism of batched |
+ # sorting with collecting the listing. |
+ out_file = io.open(out_filename, mode='w', encoding=UTF8) |
+ try: |
+ _BatchSort(_FieldedListingIterator(cls, gsutil_api, base_url_str, desc), |
+ out_file) |
+ except Exception as e: # pylint: disable=broad-except |
+ # Abandon rsync if an exception percolates up to this layer - retryable |
+ # exceptions are handled in the lower layers, so we got a non-retryable |
+ # exception (like 404 bucket not found) and proceeding would either be |
+ # futile or could result in data loss - for example: |
+ # gsutil rsync -d gs://non-existent-bucket ./localdir |
+ # would delete files from localdir. |
+ cls.logger.error( |
+ 'Caught non-retryable exception while listing %s: %s' % |
+ (base_url_str, e)) |
+ cls.non_retryable_listing_failures = 1 |
+ out_file.close() |
+ |
+ |
+def _FieldedListingIterator(cls, gsutil_api, base_url_str, desc): |
+ """Iterator over base_url_str formatting output per _BuildTmpOutputLine. |
+ |
+ Args: |
+ cls: Command instance. |
+ gsutil_api: gsutil Cloud API instance to use for bucket listing. |
+ base_url_str: The top-level URL string over which to iterate. |
+ desc: 'source' or 'destination'. |
+ |
+ Yields: |
+ Output line formatted per _BuildTmpOutputLine. |
+ """ |
+ if cls.recursion_requested: |
+ wildcard = '%s/**' % base_url_str.rstrip('/\\') |
+ else: |
+ wildcard = '%s/*' % base_url_str.rstrip('/\\') |
+ i = 0 |
+ for blr in CreateWildcardIterator( |
+ wildcard, gsutil_api, debug=cls.debug, |
+ project_id=cls.project_id).IterObjects( |
+ # Request just the needed fields, to reduce bandwidth usage. |
+ bucket_listing_fields=['crc32c', 'md5Hash', 'name', 'size']): |
+ # Various GUI tools (like the GCS web console) create placeholder objects |
+ # ending with '/' when the user creates an empty directory. Normally these |
+ # tools should delete those placeholders once objects have been written |
+ # "under" the directory, but sometimes the placeholders are left around. |
+ # We need to filter them out here, otherwise if the user tries to rsync |
+ # from GCS to a local directory it will result in a directory/file |
+ # conflict (e.g., trying to download an object called "mydata/" where the |
+ # local directory "mydata" exists). |
+ url = blr.storage_url |
+ if IsCloudSubdirPlaceholder(url, blr=blr): |
+ cls.logger.info('Skipping cloud sub-directory placeholder object (%s) ' |
+ 'because such objects aren\'t needed in (and would ' |
+ 'interfere with) directories in the local file system', |
+ url) |
+ continue |
+ if (cls.exclude_symlinks and url.IsFileUrl() |
+ and os.path.islink(url.object_name)): |
+ continue |
+ if cls.exclude_pattern: |
+ str_to_check = url.url_string[len(base_url_str):] |
+ if str_to_check.startswith(url.delim): |
+ str_to_check = str_to_check[1:] |
+ if cls.exclude_pattern.match(str_to_check): |
+ continue |
+ i += 1 |
+ if i % _PROGRESS_REPORT_LISTING_COUNT == 0: |
+ cls.logger.info('At %s listing %d...', desc, i) |
+ yield _BuildTmpOutputLine(blr) |
+ |
+ |
+def _BuildTmpOutputLine(blr): |
+ """Builds line to output to temp file for given BucketListingRef. |
+ |
+ Args: |
+ blr: The BucketListingRef. |
+ |
+ Returns: |
+ The output line, formatted as _EncodeUrl(URL)<sp>size<sp>crc32c<sp>md5 |
+ where crc32c will only be present for GCS URLs, and md5 will only be |
+ present for cloud URLs that aren't composite objects. A missing field is |
+ populated with '-'. |
+ """ |
+ crc32c = _NA |
+ md5 = _NA |
+ url = blr.storage_url |
+ if url.IsFileUrl(): |
+ size = os.path.getsize(url.object_name) |
+ elif url.IsCloudUrl(): |
+ size = blr.root_object.size |
+ crc32c = blr.root_object.crc32c or _NA |
+ md5 = blr.root_object.md5Hash or _NA |
+ else: |
+ raise CommandException('Got unexpected URL type (%s)' % url.scheme) |
+ return '%s %d %s %s\n' % (_EncodeUrl(url.url_string), size, crc32c, md5) |
+ |
+ |
+def _EncodeUrl(url_string): |
+ """Encodes url_str with quote plus encoding and UTF8 character encoding. |
+ |
+ We use this for all URL encodings. |
+ |
+ Args: |
+ url_string: String URL to encode. |
+ |
+ Returns: |
+ encoded URL. |
+ """ |
+ return urllib.quote_plus(url_string.encode(UTF8)) |
+ |
+ |
+def _DecodeUrl(enc_url_string): |
+ """Inverts encoding from EncodeUrl. |
+ |
+ Args: |
+ enc_url_string: String URL to decode. |
+ |
+ Returns: |
+ decoded URL. |
+ """ |
+ return urllib.unquote_plus(enc_url_string).decode(UTF8) |
+ |
+ |
+# pylint: disable=bare-except |
+def _BatchSort(in_iter, out_file): |
+ """Sorts input lines from in_iter and outputs to out_file. |
+ |
+ Sorts in batches as input arrives, so input file does not need to be loaded |
+ into memory all at once. Derived from Python Recipe 466302: Sorting big |
+ files the Python 2.4 way by Nicolas Lehuen. |
+ |
+ Sorted format is per _BuildTmpOutputLine. We're sorting on the entire line |
+ when we could just sort on the first record (URL); but the sort order is |
+ identical either way. |
+ |
+ Args: |
+ in_iter: Input iterator. |
+ out_file: Output file. |
+ """ |
+ # Note: If chunk_files gets very large we can run out of open FDs. See .boto |
+ # file comments about rsync_buffer_lines. If increasing rsync_buffer_lines |
+ # doesn't suffice (e.g., for someone synchronizing with a really large |
+ # bucket), an option would be to make gsutil merge in passes, never |
+ # opening all chunk files simultaneously. |
+ buffer_size = config.getint('GSUtil', 'rsync_buffer_lines', 32000) |
+ chunk_files = [] |
+ try: |
+ while True: |
+ current_chunk = sorted(islice(in_iter, buffer_size)) |
+ if not current_chunk: |
+ break |
+ output_chunk = io.open('%s-%06i' % (out_file.name, len(chunk_files)), |
+ mode='w+', encoding=UTF8) |
+ chunk_files.append(output_chunk) |
+ output_chunk.writelines(unicode(''.join(current_chunk))) |
+ output_chunk.flush() |
+ output_chunk.seek(0) |
+ out_file.writelines(heapq.merge(*chunk_files)) |
+ except IOError as e: |
+ if e.errno == errno.EMFILE: |
+ raise CommandException('\n'.join(textwrap.wrap( |
+ 'Synchronization failed because too many open file handles were ' |
+ 'needed while building synchronization state. Please see the ' |
+ 'comments about rsync_buffer_lines in your .boto config file for a ' |
+ 'possible way to address this problem.'))) |
+ raise |
+ finally: |
+ for chunk_file in chunk_files: |
+ try: |
+ chunk_file.close() |
+ os.remove(chunk_file.name) |
+ except: |
+ pass |
+ |
+ |
+class _DiffIterator(object): |
+ """Iterator yielding sequence of _DiffToApply objects.""" |
+ |
+ def __init__(self, command_obj, base_src_url, base_dst_url): |
+ self.command_obj = command_obj |
+ self.compute_file_checksums = command_obj.compute_file_checksums |
+ self.delete_extras = command_obj.delete_extras |
+ self.recursion_requested = command_obj.recursion_requested |
+ self.logger = self.command_obj.logger |
+ self.base_src_url = base_src_url |
+ self.base_dst_url = base_dst_url |
+ self.logger.info('Building synchronization state...') |
+ |
+ (src_fh, self.sorted_list_src_file_name) = tempfile.mkstemp( |
+ prefix='gsutil-rsync-src-') |
+ _tmp_files.append(self.sorted_list_src_file_name) |
+ (dst_fh, self.sorted_list_dst_file_name) = tempfile.mkstemp( |
+ prefix='gsutil-rsync-dst-') |
+ _tmp_files.append(self.sorted_list_dst_file_name) |
+ # Close the file handles; the file will be opened in write mode by |
+ # _ListUrlRootFunc. |
+ os.close(src_fh) |
+ os.close(dst_fh) |
+ |
+ # Build sorted lists of src and dst URLs in parallel. To do this, pass args |
+ # to _ListUrlRootFunc as tuple (base_url_str, out_filename, desc) |
+ # where base_url_str is the starting URL string for listing. |
+ args_iter = iter([ |
+ (self.base_src_url.url_string, self.sorted_list_src_file_name, |
+ 'source'), |
+ (self.base_dst_url.url_string, self.sorted_list_dst_file_name, |
+ 'destination') |
+ ]) |
+ |
+ # Contains error message from non-retryable listing failure. |
+ command_obj.non_retryable_listing_failures = 0 |
+ shared_attrs = ['non_retryable_listing_failures'] |
+ command_obj.Apply(_ListUrlRootFunc, args_iter, _RootListingExceptionHandler, |
+ shared_attrs, arg_checker=DummyArgChecker, |
+ parallel_operations_override=True, |
+ fail_on_error=True) |
+ |
+ if command_obj.non_retryable_listing_failures: |
+ raise CommandException('Caught non-retryable exception - aborting rsync') |
+ |
+ self.sorted_list_src_file = open(self.sorted_list_src_file_name, 'r') |
+ self.sorted_list_dst_file = open(self.sorted_list_dst_file_name, 'r') |
+ |
+ # Wrap iterators in PluralityCheckableIterator so we can check emptiness. |
+ self.sorted_src_urls_it = PluralityCheckableIterator( |
+ iter(self.sorted_list_src_file)) |
+ self.sorted_dst_urls_it = PluralityCheckableIterator( |
+ iter(self.sorted_list_dst_file)) |
+ |
+ def _ParseTmpFileLine(self, line): |
+ """Parses output from _BuildTmpOutputLine. |
+ |
+ Parses into tuple: |
+ (URL, size, crc32c, md5) |
+ where crc32c and/or md5 can be _NA. |
+ |
+ Args: |
+ line: The line to parse. |
+ |
+ Returns: |
+ Parsed tuple: (url, size, crc32c, md5) |
+ """ |
+ (encoded_url, size, crc32c, md5) = line.split() |
+ return (_DecodeUrl(encoded_url), int(size), crc32c, md5.strip()) |
+ |
+ def _WarnIfMissingCloudHash(self, url_str, crc32c, md5): |
+ """Warns if given url_str is a cloud URL and is missing both crc32c and md5. |
+ |
+ Args: |
+ url_str: Destination URL string. |
+ crc32c: Destination CRC32c. |
+ md5: Destination MD5. |
+ |
+ Returns: |
+ True if issued warning. |
+ """ |
+ # One known way this can currently happen is when rsync'ing objects larger |
+ # than 5 GB from S3 (for which the etag is not an MD5). |
+ if (StorageUrlFromString(url_str).IsCloudUrl() |
+ and crc32c == _NA and md5 == _NA): |
+ self.logger.warn( |
+ 'Found no hashes to validate %s. Integrity cannot be assured without ' |
+ 'hashes.', url_str) |
+ return True |
+ return False |
+ |
+ def _ObjectsMatch(self, src_url_str, src_size, src_crc32c, src_md5, |
+ dst_url_str, dst_size, dst_crc32c, dst_md5): |
+ """Returns True if src and dst objects are the same. |
+ |
+ Uses size plus whatever checksums are available. |
+ |
+ Args: |
+ src_url_str: Source URL string. |
+ src_size: Source size |
+ src_crc32c: Source CRC32c. |
+ src_md5: Source MD5. |
+ dst_url_str: Destination URL string. |
+ dst_size: Destination size |
+ dst_crc32c: Destination CRC32c. |
+ dst_md5: Destination MD5. |
+ |
+ Returns: |
+ True/False. |
+ """ |
+ # Note: This function is called from __iter__, which is called from the |
+ # Command.Apply driver. Thus, all checksum computation will be run in a |
+ # single thread, which is good (having multiple threads concurrently |
+ # computing checksums would thrash the disk). |
+ if src_size != dst_size: |
+ return False |
+ if self.compute_file_checksums: |
+ (src_crc32c, src_md5, dst_crc32c, dst_md5) = _ComputeNeededFileChecksums( |
+ self.logger, src_url_str, src_size, src_crc32c, src_md5, dst_url_str, |
+ dst_size, dst_crc32c, dst_md5) |
+ if src_md5 != _NA and dst_md5 != _NA: |
+ self.logger.debug('Comparing md5 for %s and %s', src_url_str, dst_url_str) |
+ return src_md5 == dst_md5 |
+ if src_crc32c != _NA and dst_crc32c != _NA: |
+ self.logger.debug( |
+ 'Comparing crc32c for %s and %s', src_url_str, dst_url_str) |
+ return src_crc32c == dst_crc32c |
+ if not self._WarnIfMissingCloudHash(src_url_str, src_crc32c, src_md5): |
+ self._WarnIfMissingCloudHash(dst_url_str, dst_crc32c, dst_md5) |
+ # Without checksums to compare we depend only on basic size comparison. |
+ return True |
+ |
+ def __iter__(self): |
+ """Iterates over src/dst URLs and produces a _DiffToApply sequence. |
+ |
+ Yields: |
+ The _DiffToApply. |
+ """ |
+ # Strip trailing slashes, if any, so we compute tail length against |
+ # consistent position regardless of whether trailing slashes were included |
+ # or not in URL. |
+ base_src_url_len = len(self.base_src_url.url_string.rstrip('/\\')) |
+ base_dst_url_len = len(self.base_dst_url.url_string.rstrip('/\\')) |
+ src_url_str = dst_url_str = None |
+ # Invariant: After each yield, the URLs in src_url_str, dst_url_str, |
+ # self.sorted_src_urls_it, and self.sorted_dst_urls_it are not yet |
+ # processed. Each time we encounter None in src_url_str or dst_url_str we |
+ # populate from the respective iterator, and we reset one or the other value |
+ # to None after yielding an action that disposes of that URL. |
+ while not self.sorted_src_urls_it.IsEmpty() or src_url_str is not None: |
+ if src_url_str is None: |
+ (src_url_str, src_size, src_crc32c, src_md5) = self._ParseTmpFileLine( |
+ self.sorted_src_urls_it.next()) |
+ # Skip past base URL and normalize slashes so we can compare across |
+ # clouds/file systems (including Windows). |
+ src_url_str_to_check = _EncodeUrl( |
+ src_url_str[base_src_url_len:].replace('\\', '/')) |
+ dst_url_str_would_copy_to = copy_helper.ConstructDstUrl( |
+ self.base_src_url, StorageUrlFromString(src_url_str), True, True, |
+ self.base_dst_url, False, self.recursion_requested).url_string |
+ if self.sorted_dst_urls_it.IsEmpty(): |
+ # We've reached end of dst URLs, so copy src to dst. |
+ yield _DiffToApply( |
+ src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY) |
+ src_url_str = None |
+ continue |
+ if not dst_url_str: |
+ (dst_url_str, dst_size, dst_crc32c, dst_md5) = ( |
+ self._ParseTmpFileLine(self.sorted_dst_urls_it.next())) |
+ # Skip past base URL and normalize slashes so we can compare acros |
+ # clouds/file systems (including Windows). |
+ dst_url_str_to_check = _EncodeUrl( |
+ dst_url_str[base_dst_url_len:].replace('\\', '/')) |
+ |
+ if src_url_str_to_check < dst_url_str_to_check: |
+ # There's no dst object corresponding to src object, so copy src to dst. |
+ yield _DiffToApply( |
+ src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY) |
+ src_url_str = None |
+ elif src_url_str_to_check > dst_url_str_to_check: |
+ # dst object without a corresponding src object, so remove dst if -d |
+ # option was specified. |
+ if self.delete_extras: |
+ yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE) |
+ dst_url_str = None |
+ else: |
+ # There is a dst object corresponding to src object, so check if objects |
+ # match. |
+ if self._ObjectsMatch( |
+ src_url_str, src_size, src_crc32c, src_md5, |
+ dst_url_str, dst_size, dst_crc32c, dst_md5): |
+ # Continue iterating without yielding a _DiffToApply. |
+ pass |
+ else: |
+ yield _DiffToApply(src_url_str, dst_url_str, _DiffAction.COPY) |
+ src_url_str = None |
+ dst_url_str = None |
+ |
+ # If -d option specified any files/objects left in dst iteration should be |
+ # removed. |
+ if not self.delete_extras: |
+ return |
+ if dst_url_str: |
+ yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE) |
+ dst_url_str = None |
+ for line in self.sorted_dst_urls_it: |
+ (dst_url_str, _, _, _) = self._ParseTmpFileLine(line) |
+ yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE) |
+ |
+ |
+def _RsyncFunc(cls, diff_to_apply, thread_state=None): |
+ """Worker function for performing the actual copy and remove operations.""" |
+ gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) |
+ dst_url_str = diff_to_apply.dst_url_str |
+ dst_url = StorageUrlFromString(dst_url_str) |
+ if diff_to_apply.diff_action == _DiffAction.REMOVE: |
+ if cls.dryrun: |
+ cls.logger.info('Would remove %s', dst_url) |
+ else: |
+ cls.logger.info('Removing %s', dst_url) |
+ if dst_url.IsFileUrl(): |
+ os.unlink(dst_url.object_name) |
+ else: |
+ try: |
+ gsutil_api.DeleteObject( |
+ dst_url.bucket_name, dst_url.object_name, |
+ generation=dst_url.generation, provider=dst_url.scheme) |
+ except NotFoundException: |
+ # If the object happened to be deleted by an external process, this |
+ # is fine because it moves us closer to the desired state. |
+ pass |
+ elif diff_to_apply.diff_action == _DiffAction.COPY: |
+ src_url_str = diff_to_apply.src_url_str |
+ src_url = StorageUrlFromString(src_url_str) |
+ if cls.dryrun: |
+ cls.logger.info('Would copy %s to %s', src_url, dst_url) |
+ else: |
+ try: |
+ copy_helper.PerformCopy(cls.logger, src_url, dst_url, gsutil_api, cls, |
+ _RsyncExceptionHandler, |
+ headers=cls.headers) |
+ except SkipUnsupportedObjectError, e: |
+ cls.logger.info('Skipping item %s with unsupported object type %s', |
+ src_url, e.unsupported_type) |
+ |
+ else: |
+ raise CommandException('Got unexpected DiffAction (%d)' |
+ % diff_to_apply.diff_action) |
+ |
+ |
+def _RootListingExceptionHandler(cls, e): |
+ """Simple exception handler for exceptions during listing URLs to sync.""" |
+ cls.logger.error(str(e)) |
+ |
+ |
+def _RsyncExceptionHandler(cls, e): |
+ """Simple exception handler to allow post-completion status.""" |
+ cls.logger.error(str(e)) |
+ cls.op_failure_count += 1 |
+ cls.logger.debug('\n\nEncountered exception while syncing:\n%s\n', |
+ traceback.format_exc()) |
+ |
+ |
+class RsyncCommand(Command): |
+ """Implementation of gsutil rsync command.""" |
+ |
+ # Command specification. See base class for documentation. |
+ command_spec = Command.CreateCommandSpec( |
+ 'rsync', |
+ command_name_aliases=[], |
+ usage_synopsis=_SYNOPSIS, |
+ min_args=2, |
+ max_args=2, |
+ supported_sub_args='cCdenprRUx:', |
+ file_url_ok=True, |
+ provider_url_ok=False, |
+ urls_start_arg=0, |
+ gs_api_support=[ApiSelector.XML, ApiSelector.JSON], |
+ gs_default_api=ApiSelector.JSON, |
+ argparse_arguments=[ |
+ CommandArgument.MakeNCloudOrFileURLsArgument(2) |
+ ] |
+ ) |
+ # Help specification. See help_provider.py for documentation. |
+ help_spec = Command.HelpSpec( |
+ help_name='rsync', |
+ help_name_aliases=['sync', 'synchronize'], |
+ help_type='command_help', |
+ help_one_line_summary='Synchronize content of two buckets/directories', |
+ help_text=_DETAILED_HELP_TEXT, |
+ subcommand_help_text={}, |
+ ) |
+ total_bytes_transferred = 0 |
+ |
+ def _InsistContainer(self, url_str, treat_nonexistent_object_as_subdir): |
+ """Sanity checks that URL names an existing container. |
+ |
+ Args: |
+ url_str: URL string to check. |
+ treat_nonexistent_object_as_subdir: indicates if should treat a |
+ non-existent object as a subdir. |
+ |
+ Returns: |
+ URL for checked string. |
+ |
+ Raises: |
+ CommandException if url_str doesn't name an existing container. |
+ """ |
+ (url, have_existing_container) = ( |
+ copy_helper.ExpandUrlToSingleBlr(url_str, self.gsutil_api, self.debug, |
+ self.project_id, |
+ treat_nonexistent_object_as_subdir)) |
+ if not have_existing_container: |
+ raise CommandException( |
+ 'arg (%s) does not name a directory, bucket, or bucket subdir.' |
+ % url_str) |
+ return url |
+ |
+ def RunCommand(self): |
+ """Command entry point for the rsync command.""" |
+ self._ParseOpts() |
+ if self.compute_file_checksums and not UsingCrcmodExtension(crcmod): |
+ self.logger.warn(SLOW_CRCMOD_WARNING) |
+ |
+ src_url = self._InsistContainer(self.args[0], False) |
+ dst_url = self._InsistContainer(self.args[1], True) |
+ |
+ # Tracks if any copy or rm operations failed. |
+ self.op_failure_count = 0 |
+ |
+ # List of attributes to share/manage across multiple processes in |
+ # parallel (-m) mode. |
+ shared_attrs = ['op_failure_count'] |
+ |
+ for signal_num in GetCaughtSignals(): |
+ RegisterSignalHandler(signal_num, _HandleSignals) |
+ |
+ # Perform sync requests in parallel (-m) mode, if requested, using |
+ # configured number of parallel processes and threads. Otherwise, |
+ # perform requests with sequential function calls in current process. |
+ diff_iterator = _DiffIterator(self, src_url, dst_url) |
+ self.logger.info('Starting synchronization') |
+ try: |
+ self.Apply(_RsyncFunc, diff_iterator, _RsyncExceptionHandler, |
+ shared_attrs, arg_checker=_DiffToApplyArgChecker, |
+ fail_on_error=True) |
+ finally: |
+ CleanUpTempFiles() |
+ |
+ if self.op_failure_count: |
+ plural_str = 's' if self.op_failure_count else '' |
+ raise CommandException( |
+ '%d file%s/object%s could not be copied/removed.' % |
+ (self.op_failure_count, plural_str, plural_str)) |
+ |
+ def _ParseOpts(self): |
+ # exclude_symlinks is handled by Command parent class, so save in Command |
+ # state rather than CopyHelperOpts. |
+ self.exclude_symlinks = False |
+ # continue_on_error is handled by Command parent class, so save in Command |
+ # state rather than CopyHelperOpts. |
+ self.continue_on_error = False |
+ self.delete_extras = False |
+ preserve_acl = False |
+ self.compute_file_checksums = False |
+ self.dryrun = False |
+ self.exclude_pattern = None |
+ self.skip_unsupported_objects = False |
+ # self.recursion_requested is initialized in command.py (so it can be |
+ # checked in parent class for all commands). |
+ |
+ if self.sub_opts: |
+ for o, a in self.sub_opts: |
+ if o == '-c': |
+ self.compute_file_checksums = True |
+ # Note: In gsutil cp command this is specified using -c but here we use |
+ # -C so we can use -c for checksum arg (to be consistent with Unix rsync |
+ # command options). |
+ elif o == '-C': |
+ self.continue_on_error = True |
+ elif o == '-d': |
+ self.delete_extras = True |
+ elif o == '-e': |
+ self.exclude_symlinks = True |
+ elif o == '-n': |
+ self.dryrun = True |
+ elif o == '-p': |
+ preserve_acl = True |
+ elif o == '-r' or o == '-R': |
+ self.recursion_requested = True |
+ elif o == '-U': |
+ self.skip_unsupported_objects = True |
+ elif o == '-x': |
+ if not a: |
+ raise CommandException('Invalid blank exclude filter') |
+ try: |
+ self.exclude_pattern = re.compile(a) |
+ except re.error: |
+ raise CommandException('Invalid exclude filter (%s)' % a) |
+ return CreateCopyHelperOpts( |
+ preserve_acl=preserve_acl, |
+ skip_unsupported_objects=self.skip_unsupported_objects) |