Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(891)

Unified Diff: third_party/gsutil/gslib/commands/rsync.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/gsutil/gslib/commands/rm.py ('k') | third_party/gsutil/gslib/commands/setmeta.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/gsutil/gslib/commands/rsync.py
diff --git a/third_party/gsutil/gslib/commands/rsync.py b/third_party/gsutil/gslib/commands/rsync.py
new file mode 100644
index 0000000000000000000000000000000000000000..4eb9b92e92ad9eb1155393634fc999e8f3cbb7e0
--- /dev/null
+++ b/third_party/gsutil/gslib/commands/rsync.py
@@ -0,0 +1,1032 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Implementation of Unix-like rsync command."""
+
+from __future__ import absolute_import
+
+import errno
+import heapq
+import io
+from itertools import islice
+import os
+import re
+import tempfile
+import textwrap
+import traceback
+import urllib
+
+from boto import config
+import crcmod
+
+from gslib import copy_helper
+from gslib.cloud_api import NotFoundException
+from gslib.command import Command
+from gslib.command import DummyArgChecker
+from gslib.command_argument import CommandArgument
+from gslib.copy_helper import CreateCopyHelperOpts
+from gslib.copy_helper import SkipUnsupportedObjectError
+from gslib.cs_api_map import ApiSelector
+from gslib.exception import CommandException
+from gslib.hashing_helper import CalculateB64EncodedCrc32cFromContents
+from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
+from gslib.hashing_helper import SLOW_CRCMOD_WARNING
+from gslib.plurality_checkable_iterator import PluralityCheckableIterator
+from gslib.sig_handling import GetCaughtSignals
+from gslib.sig_handling import RegisterSignalHandler
+from gslib.storage_url import StorageUrlFromString
+from gslib.util import GetCloudApiInstance
+from gslib.util import IsCloudSubdirPlaceholder
+from gslib.util import TEN_MIB
+from gslib.util import UsingCrcmodExtension
+from gslib.util import UTF8
+from gslib.wildcard_iterator import CreateWildcardIterator
+
+
+_SYNOPSIS = """
+ gsutil rsync [-c] [-C] [-d] [-e] [-n] [-p] [-r] [-U] [-x] src_url dst_url
+"""
+
+_DETAILED_HELP_TEXT = ("""
+<B>SYNOPSIS</B>
+""" + _SYNOPSIS + """
+
+
+<B>DESCRIPTION</B>
+ The gsutil rsync command makes the contents under dst_url the same as the
+ contents under src_url, by copying any missing files/objects, and (if the
+ -d option is specified) deleting any extra files/objects. For example, to
+ make gs://mybucket/data match the contents of the local directory "data"
+ you could do:
+
+ gsutil rsync -d data gs://mybucket/data
+
+ To recurse into directories use the -r option:
+
+ gsutil rsync -d -r data gs://mybucket/data
+
+ To copy only new/changed files without deleting extra files from
+ gs://mybucket/data leave off the -d option:
+
+ gsutil rsync -r data gs://mybucket/data
+
+ If you have a large number of objects to synchronize you might want to use the
+ gsutil -m option, to perform parallel (multi-threaded/multi-processing)
+ synchronization:
+
+ gsutil -m rsync -d -r data gs://mybucket/data
+
+ The -m option typically will provide a large performance boost if either the
+ source or destination (or both) is a cloud URL. If both source and
+ destination are file URLs the -m option will typically thrash the disk and
+ slow synchronization down.
+
+ To make the local directory "data" the same as the contents of
+ gs://mybucket/data:
+
+ gsutil rsync -d -r gs://mybucket/data data
+
+ To make the contents of gs://mybucket2 the same as gs://mybucket1:
+
+ gsutil rsync -d -r gs://mybucket1 gs://mybucket2
+
+ You can also mirror data across local directories:
+
+ gsutil rsync -d -r dir1 dir2
+
+ To mirror your content across clouds:
+
+ gsutil rsync -d -r gs://my-gs-bucket s3://my-s3-bucket
+
+ Note: If you are synchronizing a large amount of data between clouds you might
+ consider setting up a
+ `Google Compute Engine <https://cloud.google.com/products/compute-engine>`_
+ account and running gsutil there. Since cross-provider gsutil data transfers
+ flow through the machine where gsutil is running, doing this can make your
+ transfer run significantly faster than running gsutil on your local
+ workstation.
+
+
+<B>BE CAREFUL WHEN USING -d OPTION!</B>
+ The rsync -d option is very useful and commonly used, because it provides a
+ means of making the contents of a destination bucket or directory match those
+ of a source bucket or directory. However, please exercise caution when you
+ use this option: It's possible to delete large amounts of data accidentally
+ if, for example, you erroneously reverse source and destination. For example,
+ if you meant to synchronize a local directory from a bucket in the cloud but
+ instead run the command:
+
+ gsutil -m rsync -r -d ./your-dir gs://your-bucket
+
+ and your-dir is currently empty, you will quickly delete all of the objects in
+ gs://your-bucket.
+
+ You can also cause large amounts of data to be lost quickly by specifying a
+ subdirectory of the destination as the source of an rsync. For example, the
+ command:
+
+ gsutil -m rsync -r -d gs://your-bucket/data gs://your-bucket
+
+ would cause most or all of the objects in gs://your-bucket to be deleted
+ (some objects may survive if there are any with names that sort lower than
+ "data" under gs://your-bucket/data).
+
+ In addition to paying careful attention to the source and destination you
+ specify with the rsync command, there are two more safety measures your can
+ take when using gsutil rsync -d:
+
+ 1. Try running the command with the rsync -n option first, to see what it
+ would do without actually performing the operations. For example, if
+ you run the command:
+
+ gsutil -m rsync -r -d -n gs://your-bucket/data gs://your-bucket
+
+ it will be immediately evident that running that command without the -n
+ option would cause many objects to be deleted.
+
+ 2. Enable object versioning in your bucket, which will allow you to restore
+ objects if you accidentally delete them. For more details see
+ "gsutil help versions".
+
+
+<B>IMPACT OF BUCKET LISTING EVENTUAL CONSISTENCY</B>
+ The rsync command operates by listing the source and destination URLs, and
+ then performing copy and remove operations according to the differences
+ between these listings. Because bucket listing is eventually (not strongly)
+ consistent, if you upload new objects or delete objects from a bucket and then
+ immediately run gsutil rsync with that bucket as the source or destination,
+ it's possible the rsync command will not see the recent updates and thus
+ synchronize incorrectly. You can rerun the rsync operation again later to
+ correct the incorrect synchronization.
+
+
+<B>CHECKSUM VALIDATION AND FAILURE HANDLING</B>
+ At the end of every upload or download, the gsutil rsync command validates
+ that the checksum of the source file/object matches the checksum of the
+ destination file/object. If the checksums do not match, gsutil will delete
+ the invalid copy and print a warning message. This very rarely happens, but
+ if it does, please contact gs-team@google.com.
+
+ The rsync command will retry when failures occur, but if enough failures
+ happen during a particular copy or delete operation the command will skip that
+ object and move on. At the end of the synchronization run if any failures were
+ not successfully retried, the rsync command will report the count of failures,
+ and exit with non-zero status. At this point you can run the rsync command
+ again, and it will attempt any remaining needed copy and/or delete operations.
+
+ Note that there are cases where retrying will never succeed, such as if you
+ don't have write permission to the destination bucket or if the destination
+ path for some objects is longer than the maximum allowed length.
+
+ For more details about gsutil's retry handling, please see
+ "gsutil help retries".
+
+
+<B>CHANGE DETECTION ALGORITHM</B>
+ To determine if a file or object has changed gsutil rsync first checks whether
+ the source and destination sizes match. If they match, it next checks if their
+ checksums match, using checksums if available (see below). Unlike the Unix
+ rsync command, gsutil rsync does not use timestamps to determine if the
+ file/object changed, because the GCS API does not permit the caller to set an
+ object's timestamp (hence, timestamps of identical files/objects cannot be
+ made to match).
+
+ Checksums will not be available in two cases:
+
+ 1. When synchronizing to or from a file system. By default, gsutil does not
+ checksum files, because of the slowdown caused when working with large
+ files. You can cause gsutil to checksum files by using the gsutil rsync -c
+ option, at the cost of increased local disk I/O and run time when working
+ with large files. You should consider using the -c option if your files can
+ change without changing sizes (e.g., if you have files that contain fixed
+ width data, such as timestamps).
+
+ 2. When comparing composite GCS objects with objects at a cloud provider that
+ does not support CRC32C (which is the only checksum available for composite
+ objects). See 'gsutil help compose' for details about composite objects.
+
+
+<B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B>
+ If both the source and destination URL are cloud URLs from the same provider,
+ gsutil copies data "in the cloud" (i.e., without downloading to and uploading
+ from the machine where you run gsutil). In addition to the performance and
+ cost advantages of doing this, copying in the cloud preserves metadata (like
+ Content-Type and Cache-Control). In contrast, when you download data from the
+ cloud it ends up in a file, which has no associated metadata. Thus, unless you
+ have some way to hold on to or re-create that metadata, synchronizing a bucket
+ to a directory in the local file system will not retain the metadata.
+
+ Note that by default, the gsutil rsync command does not copy the ACLs of
+ objects being synchronized and instead will use the default bucket ACL (see
+ "gsutil help defacl"). You can override this behavior with the -p option (see
+ OPTIONS below).
+
+
+<B>SLOW CHECKSUMS</B>
+ If you find that CRC32C checksum computation runs slowly, this is likely
+ because you don't have a compiled CRC32c on your system. Try running:
+
+ gsutil ver -l
+
+ If the output contains:
+
+ compiled crcmod: False
+
+ you are running a Python library for computing CRC32C, which is much slower
+ than using the compiled code. For information on getting a compiled CRC32C
+ implementation, see 'gsutil help crc32c'.
+
+
+<B>LIMITATIONS</B>
+ 1. The gsutil rsync command doesn't make the destination object's timestamps
+ match those of the source object (it can't; timestamp setting is not
+ allowed by the GCS API).
+
+ 2. The gsutil rsync command ignores versioning, synchronizing only the live
+ object versions in versioned buckets.
+
+
+<B>OPTIONS</B>
+ -c Causes the rsync command to compute checksums for files if the
+ size of source and destination match, and then compare
+ checksums. This option increases local disk I/O and run time
+ if either src_url or dst_url are on the local file system.
+
+ -C If an error occurs, continue to attempt to copy the remaining
+ files. If errors occurred, gsutil's exit status will be non-zero
+ even if this flag is set. This option is implicitly set when
+ running "gsutil -m rsync...". Note: -C only applies to the
+ actual copying operation. If an error occurs while iterating
+ over the files in the local directory (e.g., invalid Unicode
+ file name) gsutil will print an error message and abort.
+
+ -d Delete extra files under dst_url not found under src_url. By
+ default extra files are not deleted. Note: this option can
+ delete data quickly if you specify the wrong source/destination
+ combination. See the help section above,
+ "BE CAREFUL WHEN USING -d OPTION!".
+
+ -e Exclude symlinks. When specified, symbolic links will be
+ ignored.
+
+ -n Causes rsync to run in "dry run" mode, i.e., just outputting
+ what would be copied or deleted without actually doing any
+ copying/deleting.
+
+ -p Causes ACLs to be preserved when synchronizing in the cloud.
+ Note that this option has performance and cost implications when
+ using the XML API, as it requires separate HTTP calls for
+ interacting with ACLs. The performance issue can be mitigated to
+ some degree by using gsutil -m rsync to cause parallel
+ synchronization. Also, this option only works if you have OWNER
+ access to all of the objects that are copied.
+
+ You can avoid the additional performance and cost of using
+ rsync -p if you want all objects in the destination bucket to
+ end up with the same ACL by setting a default object ACL on that
+ bucket instead of using rsync -p. See 'help gsutil defacl'.
+
+ -R, -r Causes directories, buckets, and bucket subdirectories to be
+ synchronized recursively. If you neglect to use this option
+ gsutil will make only the top-level directory in the source
+ and destination URLs match, skipping any sub-directories.
+
+ -U Skip objects with unsupported object types instead of failing.
+ Unsupported object types are s3 glacier objects.
+
+ -x pattern Causes files/objects matching pattern to be excluded, i.e., any
+ matching files/objects will not be copied or deleted. Note that
+ the pattern is a Python regular expression, not a wildcard (so,
+ matching any string ending in 'abc' would be specified using
+ '.*abc' rather than '*abc'). Note also that the exclude path is
+ always relative (similar to Unix rsync or tar exclude options).
+ For example, if you run the command:
+
+ gsutil rsync -x 'data./.*\\.txt' dir gs://my-bucket
+
+ it will skip the file dir/data1/a.txt.
+
+ You can use regex alternation to specify multiple exclusions,
+ for example:
+
+ gsutil rsync -x '.*\\.txt|.*\\.jpg' dir gs://my-bucket
+""")
+
+
+class _DiffAction(object):
+ COPY = 'copy'
+ REMOVE = 'remove'
+
+
+_NA = '-'
+_OUTPUT_BUFFER_SIZE = 64 * 1024
+_PROGRESS_REPORT_LISTING_COUNT = 10000
+
+
+# Tracks files we need to clean up at end or if interrupted.
+_tmp_files = []
+
+
+# pylint: disable=unused-argument
+def _HandleSignals(signal_num, cur_stack_frame):
+ """Called when rsync command is killed with SIGINT, SIGQUIT or SIGTERM."""
+ CleanUpTempFiles()
+
+
+def CleanUpTempFiles():
+ """Cleans up temp files.
+
+ This function allows the main (RunCommand) function to clean up at end of
+ operation, or if gsutil rsync is interrupted (e.g., via ^C). This is necessary
+ because tempfile.NamedTemporaryFile doesn't allow the created file to be
+ re-opened in read mode on Windows, so we have to use tempfile.mkstemp, which
+ doesn't automatically delete temp files.
+ """
+ try:
+ for fname in _tmp_files:
+ os.unlink(fname)
+ except: # pylint: disable=bare-except
+ pass
+
+
+class _DiffToApply(object):
+ """Class that encapsulates info needed to apply diff for one object."""
+
+ def __init__(self, src_url_str, dst_url_str, diff_action):
+ """Constructor.
+
+ Args:
+ src_url_str: The source URL string, or None if diff_action is REMOVE.
+ dst_url_str: The destination URL string.
+ diff_action: _DiffAction to be applied.
+ """
+ self.src_url_str = src_url_str
+ self.dst_url_str = dst_url_str
+ self.diff_action = diff_action
+
+
+def _DiffToApplyArgChecker(command_instance, diff_to_apply):
+ """Arg checker that skips symlinks if -e flag specified."""
+ if (diff_to_apply.diff_action == _DiffAction.REMOVE
+ or not command_instance.exclude_symlinks):
+ # No src URL is populated for REMOVE actions.
+ return True
+ exp_src_url = StorageUrlFromString(diff_to_apply.src_url_str)
+ if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
+ command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
+ return False
+ return True
+
+
+def _ComputeNeededFileChecksums(logger, src_url_str, src_size, src_crc32c,
+ src_md5, dst_url_str, dst_size, dst_crc32c,
+ dst_md5):
+ """Computes any file checksums needed by _ObjectsMatch.
+
+ Args:
+ logger: logging.logger for outputting log messages.
+ src_url_str: Source URL string.
+ src_size: Source size
+ src_crc32c: Source CRC32c.
+ src_md5: Source MD5.
+ dst_url_str: Destination URL string.
+ dst_size: Destination size
+ dst_crc32c: Destination CRC32c.
+ dst_md5: Destination MD5.
+
+ Returns:
+ (src_crc32c, src_md5, dst_crc32c, dst_md5)
+ """
+ src_url = StorageUrlFromString(src_url_str)
+ dst_url = StorageUrlFromString(dst_url_str)
+ if src_url.IsFileUrl():
+ if dst_crc32c != _NA or dst_url.IsFileUrl():
+ if src_size > TEN_MIB:
+ logger.info('Computing MD5 for %s...', src_url_str)
+ with open(src_url.object_name, 'rb') as fp:
+ src_crc32c = CalculateB64EncodedCrc32cFromContents(fp)
+ elif dst_md5 != _NA or dst_url.IsFileUrl():
+ if dst_size > TEN_MIB:
+ logger.info('Computing MD5 for %s...', dst_url_str)
+ with open(src_url.object_name, 'rb') as fp:
+ src_md5 = CalculateB64EncodedMd5FromContents(fp)
+ if dst_url.IsFileUrl():
+ if src_crc32c != _NA:
+ if src_size > TEN_MIB:
+ logger.info('Computing CRC32C for %s...', src_url_str)
+ with open(dst_url.object_name, 'rb') as fp:
+ dst_crc32c = CalculateB64EncodedCrc32cFromContents(fp)
+ elif src_md5 != _NA:
+ if dst_size > TEN_MIB:
+ logger.info('Computing CRC32C for %s...', dst_url_str)
+ with open(dst_url.object_name, 'rb') as fp:
+ dst_md5 = CalculateB64EncodedMd5FromContents(fp)
+ return (src_crc32c, src_md5, dst_crc32c, dst_md5)
+
+
+def _ListUrlRootFunc(cls, args_tuple, thread_state=None):
+ """Worker function for listing files/objects under to be sync'd.
+
+ Outputs sorted list to out_file_name, formatted per _BuildTmpOutputLine. We
+ sort the listed URLs because we don't want to depend on consistent sort
+ order across file systems and cloud providers.
+
+ Args:
+ cls: Command instance.
+ args_tuple: (base_url_str, out_file_name, desc), where base_url_str is
+ top-level URL string to list; out_filename is name of file to
+ which sorted output should be written; desc is 'source' or
+ 'destination'.
+ thread_state: gsutil Cloud API instance to use.
+ """
+ gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
+ (base_url_str, out_filename, desc) = args_tuple
+ # We sort while iterating over base_url_str, allowing parallelism of batched
+ # sorting with collecting the listing.
+ out_file = io.open(out_filename, mode='w', encoding=UTF8)
+ try:
+ _BatchSort(_FieldedListingIterator(cls, gsutil_api, base_url_str, desc),
+ out_file)
+ except Exception as e: # pylint: disable=broad-except
+ # Abandon rsync if an exception percolates up to this layer - retryable
+ # exceptions are handled in the lower layers, so we got a non-retryable
+ # exception (like 404 bucket not found) and proceeding would either be
+ # futile or could result in data loss - for example:
+ # gsutil rsync -d gs://non-existent-bucket ./localdir
+ # would delete files from localdir.
+ cls.logger.error(
+ 'Caught non-retryable exception while listing %s: %s' %
+ (base_url_str, e))
+ cls.non_retryable_listing_failures = 1
+ out_file.close()
+
+
+def _FieldedListingIterator(cls, gsutil_api, base_url_str, desc):
+ """Iterator over base_url_str formatting output per _BuildTmpOutputLine.
+
+ Args:
+ cls: Command instance.
+ gsutil_api: gsutil Cloud API instance to use for bucket listing.
+ base_url_str: The top-level URL string over which to iterate.
+ desc: 'source' or 'destination'.
+
+ Yields:
+ Output line formatted per _BuildTmpOutputLine.
+ """
+ if cls.recursion_requested:
+ wildcard = '%s/**' % base_url_str.rstrip('/\\')
+ else:
+ wildcard = '%s/*' % base_url_str.rstrip('/\\')
+ i = 0
+ for blr in CreateWildcardIterator(
+ wildcard, gsutil_api, debug=cls.debug,
+ project_id=cls.project_id).IterObjects(
+ # Request just the needed fields, to reduce bandwidth usage.
+ bucket_listing_fields=['crc32c', 'md5Hash', 'name', 'size']):
+ # Various GUI tools (like the GCS web console) create placeholder objects
+ # ending with '/' when the user creates an empty directory. Normally these
+ # tools should delete those placeholders once objects have been written
+ # "under" the directory, but sometimes the placeholders are left around.
+ # We need to filter them out here, otherwise if the user tries to rsync
+ # from GCS to a local directory it will result in a directory/file
+ # conflict (e.g., trying to download an object called "mydata/" where the
+ # local directory "mydata" exists).
+ url = blr.storage_url
+ if IsCloudSubdirPlaceholder(url, blr=blr):
+ cls.logger.info('Skipping cloud sub-directory placeholder object (%s) '
+ 'because such objects aren\'t needed in (and would '
+ 'interfere with) directories in the local file system',
+ url)
+ continue
+ if (cls.exclude_symlinks and url.IsFileUrl()
+ and os.path.islink(url.object_name)):
+ continue
+ if cls.exclude_pattern:
+ str_to_check = url.url_string[len(base_url_str):]
+ if str_to_check.startswith(url.delim):
+ str_to_check = str_to_check[1:]
+ if cls.exclude_pattern.match(str_to_check):
+ continue
+ i += 1
+ if i % _PROGRESS_REPORT_LISTING_COUNT == 0:
+ cls.logger.info('At %s listing %d...', desc, i)
+ yield _BuildTmpOutputLine(blr)
+
+
+def _BuildTmpOutputLine(blr):
+ """Builds line to output to temp file for given BucketListingRef.
+
+ Args:
+ blr: The BucketListingRef.
+
+ Returns:
+ The output line, formatted as _EncodeUrl(URL)<sp>size<sp>crc32c<sp>md5
+ where crc32c will only be present for GCS URLs, and md5 will only be
+ present for cloud URLs that aren't composite objects. A missing field is
+ populated with '-'.
+ """
+ crc32c = _NA
+ md5 = _NA
+ url = blr.storage_url
+ if url.IsFileUrl():
+ size = os.path.getsize(url.object_name)
+ elif url.IsCloudUrl():
+ size = blr.root_object.size
+ crc32c = blr.root_object.crc32c or _NA
+ md5 = blr.root_object.md5Hash or _NA
+ else:
+ raise CommandException('Got unexpected URL type (%s)' % url.scheme)
+ return '%s %d %s %s\n' % (_EncodeUrl(url.url_string), size, crc32c, md5)
+
+
+def _EncodeUrl(url_string):
+ """Encodes url_str with quote plus encoding and UTF8 character encoding.
+
+ We use this for all URL encodings.
+
+ Args:
+ url_string: String URL to encode.
+
+ Returns:
+ encoded URL.
+ """
+ return urllib.quote_plus(url_string.encode(UTF8))
+
+
+def _DecodeUrl(enc_url_string):
+ """Inverts encoding from EncodeUrl.
+
+ Args:
+ enc_url_string: String URL to decode.
+
+ Returns:
+ decoded URL.
+ """
+ return urllib.unquote_plus(enc_url_string).decode(UTF8)
+
+
+# pylint: disable=bare-except
+def _BatchSort(in_iter, out_file):
+ """Sorts input lines from in_iter and outputs to out_file.
+
+ Sorts in batches as input arrives, so input file does not need to be loaded
+ into memory all at once. Derived from Python Recipe 466302: Sorting big
+ files the Python 2.4 way by Nicolas Lehuen.
+
+ Sorted format is per _BuildTmpOutputLine. We're sorting on the entire line
+ when we could just sort on the first record (URL); but the sort order is
+ identical either way.
+
+ Args:
+ in_iter: Input iterator.
+ out_file: Output file.
+ """
+ # Note: If chunk_files gets very large we can run out of open FDs. See .boto
+ # file comments about rsync_buffer_lines. If increasing rsync_buffer_lines
+ # doesn't suffice (e.g., for someone synchronizing with a really large
+ # bucket), an option would be to make gsutil merge in passes, never
+ # opening all chunk files simultaneously.
+ buffer_size = config.getint('GSUtil', 'rsync_buffer_lines', 32000)
+ chunk_files = []
+ try:
+ while True:
+ current_chunk = sorted(islice(in_iter, buffer_size))
+ if not current_chunk:
+ break
+ output_chunk = io.open('%s-%06i' % (out_file.name, len(chunk_files)),
+ mode='w+', encoding=UTF8)
+ chunk_files.append(output_chunk)
+ output_chunk.writelines(unicode(''.join(current_chunk)))
+ output_chunk.flush()
+ output_chunk.seek(0)
+ out_file.writelines(heapq.merge(*chunk_files))
+ except IOError as e:
+ if e.errno == errno.EMFILE:
+ raise CommandException('\n'.join(textwrap.wrap(
+ 'Synchronization failed because too many open file handles were '
+ 'needed while building synchronization state. Please see the '
+ 'comments about rsync_buffer_lines in your .boto config file for a '
+ 'possible way to address this problem.')))
+ raise
+ finally:
+ for chunk_file in chunk_files:
+ try:
+ chunk_file.close()
+ os.remove(chunk_file.name)
+ except:
+ pass
+
+
+class _DiffIterator(object):
+ """Iterator yielding sequence of _DiffToApply objects."""
+
+ def __init__(self, command_obj, base_src_url, base_dst_url):
+ self.command_obj = command_obj
+ self.compute_file_checksums = command_obj.compute_file_checksums
+ self.delete_extras = command_obj.delete_extras
+ self.recursion_requested = command_obj.recursion_requested
+ self.logger = self.command_obj.logger
+ self.base_src_url = base_src_url
+ self.base_dst_url = base_dst_url
+ self.logger.info('Building synchronization state...')
+
+ (src_fh, self.sorted_list_src_file_name) = tempfile.mkstemp(
+ prefix='gsutil-rsync-src-')
+ _tmp_files.append(self.sorted_list_src_file_name)
+ (dst_fh, self.sorted_list_dst_file_name) = tempfile.mkstemp(
+ prefix='gsutil-rsync-dst-')
+ _tmp_files.append(self.sorted_list_dst_file_name)
+ # Close the file handles; the file will be opened in write mode by
+ # _ListUrlRootFunc.
+ os.close(src_fh)
+ os.close(dst_fh)
+
+ # Build sorted lists of src and dst URLs in parallel. To do this, pass args
+ # to _ListUrlRootFunc as tuple (base_url_str, out_filename, desc)
+ # where base_url_str is the starting URL string for listing.
+ args_iter = iter([
+ (self.base_src_url.url_string, self.sorted_list_src_file_name,
+ 'source'),
+ (self.base_dst_url.url_string, self.sorted_list_dst_file_name,
+ 'destination')
+ ])
+
+ # Contains error message from non-retryable listing failure.
+ command_obj.non_retryable_listing_failures = 0
+ shared_attrs = ['non_retryable_listing_failures']
+ command_obj.Apply(_ListUrlRootFunc, args_iter, _RootListingExceptionHandler,
+ shared_attrs, arg_checker=DummyArgChecker,
+ parallel_operations_override=True,
+ fail_on_error=True)
+
+ if command_obj.non_retryable_listing_failures:
+ raise CommandException('Caught non-retryable exception - aborting rsync')
+
+ self.sorted_list_src_file = open(self.sorted_list_src_file_name, 'r')
+ self.sorted_list_dst_file = open(self.sorted_list_dst_file_name, 'r')
+
+ # Wrap iterators in PluralityCheckableIterator so we can check emptiness.
+ self.sorted_src_urls_it = PluralityCheckableIterator(
+ iter(self.sorted_list_src_file))
+ self.sorted_dst_urls_it = PluralityCheckableIterator(
+ iter(self.sorted_list_dst_file))
+
+ def _ParseTmpFileLine(self, line):
+ """Parses output from _BuildTmpOutputLine.
+
+ Parses into tuple:
+ (URL, size, crc32c, md5)
+ where crc32c and/or md5 can be _NA.
+
+ Args:
+ line: The line to parse.
+
+ Returns:
+ Parsed tuple: (url, size, crc32c, md5)
+ """
+ (encoded_url, size, crc32c, md5) = line.split()
+ return (_DecodeUrl(encoded_url), int(size), crc32c, md5.strip())
+
+ def _WarnIfMissingCloudHash(self, url_str, crc32c, md5):
+ """Warns if given url_str is a cloud URL and is missing both crc32c and md5.
+
+ Args:
+ url_str: Destination URL string.
+ crc32c: Destination CRC32c.
+ md5: Destination MD5.
+
+ Returns:
+ True if issued warning.
+ """
+ # One known way this can currently happen is when rsync'ing objects larger
+ # than 5 GB from S3 (for which the etag is not an MD5).
+ if (StorageUrlFromString(url_str).IsCloudUrl()
+ and crc32c == _NA and md5 == _NA):
+ self.logger.warn(
+ 'Found no hashes to validate %s. Integrity cannot be assured without '
+ 'hashes.', url_str)
+ return True
+ return False
+
+ def _ObjectsMatch(self, src_url_str, src_size, src_crc32c, src_md5,
+ dst_url_str, dst_size, dst_crc32c, dst_md5):
+ """Returns True if src and dst objects are the same.
+
+ Uses size plus whatever checksums are available.
+
+ Args:
+ src_url_str: Source URL string.
+ src_size: Source size
+ src_crc32c: Source CRC32c.
+ src_md5: Source MD5.
+ dst_url_str: Destination URL string.
+ dst_size: Destination size
+ dst_crc32c: Destination CRC32c.
+ dst_md5: Destination MD5.
+
+ Returns:
+ True/False.
+ """
+ # Note: This function is called from __iter__, which is called from the
+ # Command.Apply driver. Thus, all checksum computation will be run in a
+ # single thread, which is good (having multiple threads concurrently
+ # computing checksums would thrash the disk).
+ if src_size != dst_size:
+ return False
+ if self.compute_file_checksums:
+ (src_crc32c, src_md5, dst_crc32c, dst_md5) = _ComputeNeededFileChecksums(
+ self.logger, src_url_str, src_size, src_crc32c, src_md5, dst_url_str,
+ dst_size, dst_crc32c, dst_md5)
+ if src_md5 != _NA and dst_md5 != _NA:
+ self.logger.debug('Comparing md5 for %s and %s', src_url_str, dst_url_str)
+ return src_md5 == dst_md5
+ if src_crc32c != _NA and dst_crc32c != _NA:
+ self.logger.debug(
+ 'Comparing crc32c for %s and %s', src_url_str, dst_url_str)
+ return src_crc32c == dst_crc32c
+ if not self._WarnIfMissingCloudHash(src_url_str, src_crc32c, src_md5):
+ self._WarnIfMissingCloudHash(dst_url_str, dst_crc32c, dst_md5)
+ # Without checksums to compare we depend only on basic size comparison.
+ return True
+
+ def __iter__(self):
+ """Iterates over src/dst URLs and produces a _DiffToApply sequence.
+
+ Yields:
+ The _DiffToApply.
+ """
+ # Strip trailing slashes, if any, so we compute tail length against
+ # consistent position regardless of whether trailing slashes were included
+ # or not in URL.
+ base_src_url_len = len(self.base_src_url.url_string.rstrip('/\\'))
+ base_dst_url_len = len(self.base_dst_url.url_string.rstrip('/\\'))
+ src_url_str = dst_url_str = None
+ # Invariant: After each yield, the URLs in src_url_str, dst_url_str,
+ # self.sorted_src_urls_it, and self.sorted_dst_urls_it are not yet
+ # processed. Each time we encounter None in src_url_str or dst_url_str we
+ # populate from the respective iterator, and we reset one or the other value
+ # to None after yielding an action that disposes of that URL.
+ while not self.sorted_src_urls_it.IsEmpty() or src_url_str is not None:
+ if src_url_str is None:
+ (src_url_str, src_size, src_crc32c, src_md5) = self._ParseTmpFileLine(
+ self.sorted_src_urls_it.next())
+ # Skip past base URL and normalize slashes so we can compare across
+ # clouds/file systems (including Windows).
+ src_url_str_to_check = _EncodeUrl(
+ src_url_str[base_src_url_len:].replace('\\', '/'))
+ dst_url_str_would_copy_to = copy_helper.ConstructDstUrl(
+ self.base_src_url, StorageUrlFromString(src_url_str), True, True,
+ self.base_dst_url, False, self.recursion_requested).url_string
+ if self.sorted_dst_urls_it.IsEmpty():
+ # We've reached end of dst URLs, so copy src to dst.
+ yield _DiffToApply(
+ src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY)
+ src_url_str = None
+ continue
+ if not dst_url_str:
+ (dst_url_str, dst_size, dst_crc32c, dst_md5) = (
+ self._ParseTmpFileLine(self.sorted_dst_urls_it.next()))
+ # Skip past base URL and normalize slashes so we can compare acros
+ # clouds/file systems (including Windows).
+ dst_url_str_to_check = _EncodeUrl(
+ dst_url_str[base_dst_url_len:].replace('\\', '/'))
+
+ if src_url_str_to_check < dst_url_str_to_check:
+ # There's no dst object corresponding to src object, so copy src to dst.
+ yield _DiffToApply(
+ src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY)
+ src_url_str = None
+ elif src_url_str_to_check > dst_url_str_to_check:
+ # dst object without a corresponding src object, so remove dst if -d
+ # option was specified.
+ if self.delete_extras:
+ yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
+ dst_url_str = None
+ else:
+ # There is a dst object corresponding to src object, so check if objects
+ # match.
+ if self._ObjectsMatch(
+ src_url_str, src_size, src_crc32c, src_md5,
+ dst_url_str, dst_size, dst_crc32c, dst_md5):
+ # Continue iterating without yielding a _DiffToApply.
+ pass
+ else:
+ yield _DiffToApply(src_url_str, dst_url_str, _DiffAction.COPY)
+ src_url_str = None
+ dst_url_str = None
+
+ # If -d option specified any files/objects left in dst iteration should be
+ # removed.
+ if not self.delete_extras:
+ return
+ if dst_url_str:
+ yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
+ dst_url_str = None
+ for line in self.sorted_dst_urls_it:
+ (dst_url_str, _, _, _) = self._ParseTmpFileLine(line)
+ yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
+
+
+def _RsyncFunc(cls, diff_to_apply, thread_state=None):
+ """Worker function for performing the actual copy and remove operations."""
+ gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
+ dst_url_str = diff_to_apply.dst_url_str
+ dst_url = StorageUrlFromString(dst_url_str)
+ if diff_to_apply.diff_action == _DiffAction.REMOVE:
+ if cls.dryrun:
+ cls.logger.info('Would remove %s', dst_url)
+ else:
+ cls.logger.info('Removing %s', dst_url)
+ if dst_url.IsFileUrl():
+ os.unlink(dst_url.object_name)
+ else:
+ try:
+ gsutil_api.DeleteObject(
+ dst_url.bucket_name, dst_url.object_name,
+ generation=dst_url.generation, provider=dst_url.scheme)
+ except NotFoundException:
+ # If the object happened to be deleted by an external process, this
+ # is fine because it moves us closer to the desired state.
+ pass
+ elif diff_to_apply.diff_action == _DiffAction.COPY:
+ src_url_str = diff_to_apply.src_url_str
+ src_url = StorageUrlFromString(src_url_str)
+ if cls.dryrun:
+ cls.logger.info('Would copy %s to %s', src_url, dst_url)
+ else:
+ try:
+ copy_helper.PerformCopy(cls.logger, src_url, dst_url, gsutil_api, cls,
+ _RsyncExceptionHandler,
+ headers=cls.headers)
+ except SkipUnsupportedObjectError, e:
+ cls.logger.info('Skipping item %s with unsupported object type %s',
+ src_url, e.unsupported_type)
+
+ else:
+ raise CommandException('Got unexpected DiffAction (%d)'
+ % diff_to_apply.diff_action)
+
+
+def _RootListingExceptionHandler(cls, e):
+ """Simple exception handler for exceptions during listing URLs to sync."""
+ cls.logger.error(str(e))
+
+
+def _RsyncExceptionHandler(cls, e):
+ """Simple exception handler to allow post-completion status."""
+ cls.logger.error(str(e))
+ cls.op_failure_count += 1
+ cls.logger.debug('\n\nEncountered exception while syncing:\n%s\n',
+ traceback.format_exc())
+
+
+class RsyncCommand(Command):
+ """Implementation of gsutil rsync command."""
+
+ # Command specification. See base class for documentation.
+ command_spec = Command.CreateCommandSpec(
+ 'rsync',
+ command_name_aliases=[],
+ usage_synopsis=_SYNOPSIS,
+ min_args=2,
+ max_args=2,
+ supported_sub_args='cCdenprRUx:',
+ file_url_ok=True,
+ provider_url_ok=False,
+ urls_start_arg=0,
+ gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
+ gs_default_api=ApiSelector.JSON,
+ argparse_arguments=[
+ CommandArgument.MakeNCloudOrFileURLsArgument(2)
+ ]
+ )
+ # Help specification. See help_provider.py for documentation.
+ help_spec = Command.HelpSpec(
+ help_name='rsync',
+ help_name_aliases=['sync', 'synchronize'],
+ help_type='command_help',
+ help_one_line_summary='Synchronize content of two buckets/directories',
+ help_text=_DETAILED_HELP_TEXT,
+ subcommand_help_text={},
+ )
+ total_bytes_transferred = 0
+
+ def _InsistContainer(self, url_str, treat_nonexistent_object_as_subdir):
+ """Sanity checks that URL names an existing container.
+
+ Args:
+ url_str: URL string to check.
+ treat_nonexistent_object_as_subdir: indicates if should treat a
+ non-existent object as a subdir.
+
+ Returns:
+ URL for checked string.
+
+ Raises:
+ CommandException if url_str doesn't name an existing container.
+ """
+ (url, have_existing_container) = (
+ copy_helper.ExpandUrlToSingleBlr(url_str, self.gsutil_api, self.debug,
+ self.project_id,
+ treat_nonexistent_object_as_subdir))
+ if not have_existing_container:
+ raise CommandException(
+ 'arg (%s) does not name a directory, bucket, or bucket subdir.'
+ % url_str)
+ return url
+
+ def RunCommand(self):
+ """Command entry point for the rsync command."""
+ self._ParseOpts()
+ if self.compute_file_checksums and not UsingCrcmodExtension(crcmod):
+ self.logger.warn(SLOW_CRCMOD_WARNING)
+
+ src_url = self._InsistContainer(self.args[0], False)
+ dst_url = self._InsistContainer(self.args[1], True)
+
+ # Tracks if any copy or rm operations failed.
+ self.op_failure_count = 0
+
+ # List of attributes to share/manage across multiple processes in
+ # parallel (-m) mode.
+ shared_attrs = ['op_failure_count']
+
+ for signal_num in GetCaughtSignals():
+ RegisterSignalHandler(signal_num, _HandleSignals)
+
+ # Perform sync requests in parallel (-m) mode, if requested, using
+ # configured number of parallel processes and threads. Otherwise,
+ # perform requests with sequential function calls in current process.
+ diff_iterator = _DiffIterator(self, src_url, dst_url)
+ self.logger.info('Starting synchronization')
+ try:
+ self.Apply(_RsyncFunc, diff_iterator, _RsyncExceptionHandler,
+ shared_attrs, arg_checker=_DiffToApplyArgChecker,
+ fail_on_error=True)
+ finally:
+ CleanUpTempFiles()
+
+ if self.op_failure_count:
+ plural_str = 's' if self.op_failure_count else ''
+ raise CommandException(
+ '%d file%s/object%s could not be copied/removed.' %
+ (self.op_failure_count, plural_str, plural_str))
+
+ def _ParseOpts(self):
+ # exclude_symlinks is handled by Command parent class, so save in Command
+ # state rather than CopyHelperOpts.
+ self.exclude_symlinks = False
+ # continue_on_error is handled by Command parent class, so save in Command
+ # state rather than CopyHelperOpts.
+ self.continue_on_error = False
+ self.delete_extras = False
+ preserve_acl = False
+ self.compute_file_checksums = False
+ self.dryrun = False
+ self.exclude_pattern = None
+ self.skip_unsupported_objects = False
+ # self.recursion_requested is initialized in command.py (so it can be
+ # checked in parent class for all commands).
+
+ if self.sub_opts:
+ for o, a in self.sub_opts:
+ if o == '-c':
+ self.compute_file_checksums = True
+ # Note: In gsutil cp command this is specified using -c but here we use
+ # -C so we can use -c for checksum arg (to be consistent with Unix rsync
+ # command options).
+ elif o == '-C':
+ self.continue_on_error = True
+ elif o == '-d':
+ self.delete_extras = True
+ elif o == '-e':
+ self.exclude_symlinks = True
+ elif o == '-n':
+ self.dryrun = True
+ elif o == '-p':
+ preserve_acl = True
+ elif o == '-r' or o == '-R':
+ self.recursion_requested = True
+ elif o == '-U':
+ self.skip_unsupported_objects = True
+ elif o == '-x':
+ if not a:
+ raise CommandException('Invalid blank exclude filter')
+ try:
+ self.exclude_pattern = re.compile(a)
+ except re.error:
+ raise CommandException('Invalid exclude filter (%s)' % a)
+ return CreateCopyHelperOpts(
+ preserve_acl=preserve_acl,
+ skip_unsupported_objects=self.skip_unsupported_objects)
« no previous file with comments | « third_party/gsutil/gslib/commands/rm.py ('k') | third_party/gsutil/gslib/commands/setmeta.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698