| OLD | NEW |
| (Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright 2014 Google Inc. All Rights Reserved. |
| 3 # |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 # you may not use this file except in compliance with the License. |
| 6 # You may obtain a copy of the License at |
| 7 # |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 # |
| 10 # Unless required by applicable law or agreed to in writing, software |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 # See the License for the specific language governing permissions and |
| 14 # limitations under the License. |
| 15 """Implementation of Unix-like rsync command.""" |
| 16 |
| 17 from __future__ import absolute_import |
| 18 |
| 19 import errno |
| 20 import heapq |
| 21 import io |
| 22 from itertools import islice |
| 23 import os |
| 24 import tempfile |
| 25 import textwrap |
| 26 import traceback |
| 27 import urllib |
| 28 |
| 29 from boto import config |
| 30 import crcmod |
| 31 |
| 32 from gslib import copy_helper |
| 33 from gslib.cloud_api import NotFoundException |
| 34 from gslib.command import Command |
| 35 from gslib.command import DummyArgChecker |
| 36 from gslib.copy_helper import CreateCopyHelperOpts |
| 37 from gslib.cs_api_map import ApiSelector |
| 38 from gslib.exception import CommandException |
| 39 from gslib.hashing_helper import CalculateB64EncodedCrc32cFromContents |
| 40 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents |
| 41 from gslib.hashing_helper import SLOW_CRCMOD_WARNING |
| 42 from gslib.plurality_checkable_iterator import PluralityCheckableIterator |
| 43 from gslib.storage_url import StorageUrlFromString |
| 44 from gslib.util import GetCloudApiInstance |
| 45 from gslib.util import IsCloudSubdirPlaceholder |
| 46 from gslib.util import TEN_MB |
| 47 from gslib.util import UsingCrcmodExtension |
| 48 from gslib.util import UTF8 |
| 49 from gslib.wildcard_iterator import CreateWildcardIterator |
| 50 |
| 51 |
| 52 _DETAILED_HELP_TEXT = (""" |
| 53 <B>SYNOPSIS</B> |
| 54 gsutil rsync [-c] [-C] [-d] [-e] [-n] [-p] [-R] src_url dst_url |
| 55 |
| 56 |
| 57 <B>DESCRIPTION</B> |
| 58 The gsutil rsync command makes the contents under dst_url the same as the |
| 59 contents under src_url, by copying any missing files/objects, and (if the |
| 60 -d option is specified) deleting any extra files/objects. For example, to |
| 61 make gs://mybucket/data match the contents of the local directory "data" |
| 62 you could do: |
| 63 |
| 64 gsutil rsync -d data gs://mybucket/data |
| 65 |
| 66 To recurse into directories use the -r option: |
| 67 |
| 68 gsutil rsync -d -r data gs://mybucket/data |
| 69 |
| 70 To copy only new/changed files without deleting extra files from |
| 71 gs://mybucket/data leave off the -d option: |
| 72 |
| 73 gsutil rsync -r data gs://mybucket/data |
| 74 |
| 75 If you have a large number of objects to synchronize you might want to use the |
| 76 gsutil -m option, to perform parallel (multi-threaded/multi-processing) |
| 77 synchronization: |
| 78 |
| 79 gsutil -m rsync -d -r data gs://mybucket/data |
| 80 |
| 81 The -m option typically will provide a large performance boost if either the |
| 82 source or destination (or both) is a cloud URL. If both source and |
| 83 destination are file URLs the -m option will typically thrash the disk and |
| 84 slow synchronization down. |
| 85 |
| 86 To make the local directory "data" the same as the contents of |
| 87 gs://mybucket/data: |
| 88 |
| 89 gsutil rsync -d -r gs://mybucket/data data |
| 90 |
| 91 To make the contents of gs://mybucket2 the same as gs://mybucket1: |
| 92 |
| 93 gsutil rsync -d -r gs://mybucket1 gs://mybucket2 |
| 94 |
| 95 You can also mirror data across local directories: |
| 96 |
| 97 gsutil rsync -d -r dir1 dir2 |
| 98 |
| 99 To mirror your content across clouds: |
| 100 |
| 101 gsutil rsync -d -r gs://my-gs-bucket s3://my-s3-bucket |
| 102 |
| 103 Note: If you are synchronizing a large amount of data between clouds you might |
| 104 consider setting up a |
| 105 `Google Compute Engine <https://cloud.google.com/products/compute-engine>`_ |
| 106 account and running gsutil there. Since cross-provider gsutil data transfers |
| 107 flow through the machine where gsutil is running, doing this can make your |
| 108 transfer run singificantly faster than running gsutil on your local |
| 109 workstation. |
| 110 |
| 111 |
| 112 <B>CHECKSUM VALIDATION AND FAILURE HANDLING</B> |
| 113 At the end of every upload or download, the gsutil rsync command validates |
| 114 that the checksum of the source file/object matches the checksum of the |
| 115 destination file/object. If the checksums do not match, gsutil will delete |
| 116 the invalid copy and print a warning message. This very rarely happens, but |
| 117 if it does, please contact gs-team@google.com. |
| 118 |
| 119 The rsync command will retry when failures occur, but if enough failures |
| 120 happen during a particular copy or delete operation the command will skip that |
| 121 object and move on. At the end of the synchronization run if any failures were |
| 122 not successfully retried, the rsync command will report the count of failures, |
| 123 and exit with non-zero status. At this point you can run the rsync command |
| 124 again, and it will attempt any remaining needed copy and/or delete operations. |
| 125 |
| 126 Note that there are cases where retrying will never succeed, such as if you |
| 127 don't have write permission to the destination bucket or if the destination |
| 128 path for some objects is longer than the maximum allowed length. |
| 129 |
| 130 For more details about gsutil's retry handling, please see |
| 131 "gsutil help retries". |
| 132 |
| 133 |
| 134 <B>CHANGE DETECTION ALGORITHM</B> |
| 135 To determine if a file or object has changed gsutil rsync first checks whether |
| 136 the source and destination sizes match. If they match, it next checks if their |
| 137 checksums match, using whatever checksums are available (see below). Unlike |
| 138 the Unix rsync command, gsutil rsync does not use timestamps to determine if |
| 139 the file/object changed, because the GCS API does not permit the caller to set |
| 140 an object's timestamp (hence, timestamps of identical files/objects cannot be |
| 141 made to match). |
| 142 |
| 143 Checksums will not be available in two cases: |
| 144 |
| 145 1. When synchronizing to or from a file system. By default, gsutil does not |
| 146 checksum files, because of the slowdown caused when working with large |
| 147 files. You can cause gsutil to checksum files by using the |
| 148 gsutil rsync -c option, at the cost of increased local disk I/O and run |
| 149 time when working with large files. |
| 150 |
| 151 2. When comparing composite GCS objects with objects at a cloud provider that |
| 152 does not support CRC32C (which is the only checksum available for composite |
| 153 objects). See 'gsutil help compose' for details about composite objects. |
| 154 |
| 155 |
| 156 <B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B> |
| 157 If both the source and destination URL are cloud URLs from the same provider, |
| 158 gsutil copies data "in the cloud" (i.e., without downloading to and uploading |
| 159 from the machine where you run gsutil). In addition to the performance and |
| 160 cost advantages of doing this, copying in the cloud preserves metadata (like |
| 161 Content-Type and Cache-Control). In contrast, when you download data from the |
| 162 cloud it ends up in a file, which has no associated metadata. Thus, unless you |
| 163 have some way to hold on to or re-create that metadata, synchronizing a bucket |
| 164 to a directory in the local file system will not retain the metadata. |
| 165 |
| 166 Note that by default, the gsutil rsync command does not copy the ACLs of |
| 167 objects being synchronized and instead will use the default bucket ACL (see |
| 168 "gsutil help defacl"). You can override this behavior with the -p option (see |
| 169 OPTIONS below). |
| 170 |
| 171 |
| 172 <B>SLOW CHECKSUMS</B> |
| 173 If you find that CRC32C checksum computation runs slowly, this is likely |
| 174 because you don't have a compiled CRC32c on your system. Try running: |
| 175 |
| 176 gsutil ver -l |
| 177 |
| 178 If the output contains: |
| 179 |
| 180 compiled crcmod: False |
| 181 |
| 182 you are running a Python library for computing CRC32C, which is much slower |
| 183 than using the compiled code. For information on getting a compiled CRC32C |
| 184 implementation, see 'gsutil help crc32c'. |
| 185 |
| 186 |
| 187 <B>LIMITATIONS</B> |
| 188 1. The gsutil rsync command doesn't make the destination object's timestamps |
| 189 match those of the source object (it can't; timestamp setting is not |
| 190 allowed by the GCS API). |
| 191 |
| 192 2. The gsutil rsync command ignores versioning, synchronizing only the live |
| 193 object versions in versioned buckets. |
| 194 |
| 195 |
| 196 <B>OPTIONS</B> |
| 197 -c Causes the rsync command to compute checksums for files if the |
| 198 size of source and destination match, and then compare |
| 199 checksums. This option increases local disk I/O and run time |
| 200 if either src_url or dst_url are on the local file system. |
| 201 |
| 202 -C If an error occurs, continue to attempt to copy the remaining |
| 203 files. If errors occurred, gsutil's exit status will be non-zero |
| 204 even if this flag is set. This option is implicitly set when |
| 205 running "gsutil -m rsync...". Note: -C only applies to the |
| 206 actual copying operation. If an error occurs while iterating |
| 207 over the files in the local directory (e.g., invalid Unicode |
| 208 file name) gsutil will print an error message and abort. |
| 209 |
| 210 -d Delete extra files under dst_url not found under src_url. By |
| 211 default extra files are not deleted. |
| 212 |
| 213 -e Exclude symlinks. When specified, symbolic links will be |
| 214 ignored. |
| 215 |
| 216 -n Causes rsync to run in "dry run" mode, i.e., just outputting |
| 217 what would be copied or deleted without actually doing any |
| 218 copying/deleting. |
| 219 |
| 220 -p Causes ACLs to be preserved when synchronizing in the cloud. |
| 221 Note that this option has performance and cost implications when |
| 222 using the XML API, as it requires separate HTTP calls for |
| 223 interacting with ACLs. The performance issue can be mitigated to |
| 224 some degree by using gsutil -m rsync to cause parallel |
| 225 synchronization. Also, this option only works if you have OWNER |
| 226 access to all of the objects that are copied. |
| 227 |
| 228 You can avoid the additional performance and cost of using |
| 229 rsync -p if you want all objects in the destination bucket to |
| 230 end up with the same ACL by setting a default object ACL on that |
| 231 bucket instead of using rsync -p. See 'help gsutil defacl'. |
| 232 |
| 233 -R, -r Causes directories, buckets, and bucket subdirectories to be |
| 234 synchronized recursively. If you neglect to use this option |
| 235 gsutil will make only the top-level directory in the source |
| 236 and destination URLs match, skipping any sub-directories. |
| 237 """) |
| 238 |
| 239 |
| 240 class _DiffAction(object): |
| 241 COPY = 'copy' |
| 242 REMOVE = 'remove' |
| 243 |
| 244 |
| 245 _NA = '-' |
| 246 _OUTPUT_BUFFER_SIZE = 64 * 1024 |
| 247 _PROGRESS_REPORT_LISTING_COUNT = 10000 |
| 248 |
| 249 |
| 250 class _DiffToApply(object): |
| 251 """Class that encapsulates info needed to apply diff for one object.""" |
| 252 |
| 253 def __init__(self, src_url_str, dst_url_str, diff_action): |
| 254 """Constructor. |
| 255 |
| 256 Args: |
| 257 src_url_str: The source URL string, or None if diff_action is REMOVE. |
| 258 dst_url_str: The destination URL string. |
| 259 diff_action: _DiffAction to be applied. |
| 260 """ |
| 261 self.src_url_str = src_url_str |
| 262 self.dst_url_str = dst_url_str |
| 263 self.diff_action = diff_action |
| 264 |
| 265 |
| 266 def _DiffToApplyArgChecker(command_instance, diff_to_apply): |
| 267 """Arg checker that skips symlinks if -e flag specified.""" |
| 268 if (diff_to_apply.diff_action == _DiffAction.REMOVE |
| 269 or not command_instance.exclude_symlinks): |
| 270 # No src URL is populated for REMOVE actions. |
| 271 return True |
| 272 exp_src_url = StorageUrlFromString(diff_to_apply.src_url_str) |
| 273 if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name): |
| 274 command_instance.logger.info('Skipping symbolic link %s...', exp_src_url) |
| 275 return False |
| 276 return True |
| 277 |
| 278 |
| 279 def _ComputeNeededFileChecksums(logger, src_url_str, src_size, src_crc32c, |
| 280 src_md5, dst_url_str, dst_size, dst_crc32c, |
| 281 dst_md5): |
| 282 """Computes any file checksums needed by _ObjectsMatch. |
| 283 |
| 284 Args: |
| 285 logger: logging.logger for outputting log messages. |
| 286 src_url_str: Source URL string. |
| 287 src_size: Source size |
| 288 src_crc32c: Source CRC32c. |
| 289 src_md5: Source MD5. |
| 290 dst_url_str: Destination URL string. |
| 291 dst_size: Destination size |
| 292 dst_crc32c: Destination CRC32c. |
| 293 dst_md5: Destination MD5. |
| 294 |
| 295 Returns: |
| 296 (src_crc32c, src_md5, dst_crc32c, dst_md5) |
| 297 """ |
| 298 src_url = StorageUrlFromString(src_url_str) |
| 299 dst_url = StorageUrlFromString(dst_url_str) |
| 300 if src_url.IsFileUrl(): |
| 301 if dst_crc32c != _NA or dst_url.IsFileUrl(): |
| 302 if src_size > TEN_MB: |
| 303 logger.info('Computing MD5 for %s...', src_url_str) |
| 304 with open(src_url.object_name, 'rb') as fp: |
| 305 src_crc32c = CalculateB64EncodedCrc32cFromContents(fp) |
| 306 elif dst_md5 != _NA or dst_url.IsFileUrl(): |
| 307 if dst_size > TEN_MB: |
| 308 logger.info('Computing MD5 for %s...', dst_url_str) |
| 309 with open(src_url.object_name, 'rb') as fp: |
| 310 src_md5 = CalculateB64EncodedMd5FromContents(fp) |
| 311 if dst_url.IsFileUrl(): |
| 312 if src_crc32c != _NA: |
| 313 if src_size > TEN_MB: |
| 314 logger.info('Computing CRC32C for %s...', src_url_str) |
| 315 with open(dst_url.object_name, 'rb') as fp: |
| 316 dst_crc32c = CalculateB64EncodedCrc32cFromContents(fp) |
| 317 elif src_md5 != _NA: |
| 318 if dst_size > TEN_MB: |
| 319 logger.info('Computing CRC32C for %s...', dst_url_str) |
| 320 with open(dst_url.object_name, 'rb') as fp: |
| 321 dst_md5 = CalculateB64EncodedMd5FromContents(fp) |
| 322 return (src_crc32c, src_md5, dst_crc32c, dst_md5) |
| 323 |
| 324 |
| 325 def _ListUrlRootFunc(cls, args_tuple, thread_state=None): |
| 326 """Worker function for listing files/objects under to be sync'd. |
| 327 |
| 328 Outputs sorted list to out_file_name, formatted per _BuildTmpOutputLine. We |
| 329 sort the listed URLs because we don't want to depend on consistent sort |
| 330 order across file systems and cloud providers. |
| 331 |
| 332 Args: |
| 333 cls: Command instance. |
| 334 args_tuple: (url_str, out_file_name, desc), where url_str is URL string to |
| 335 list; out_file_name is name of file to which sorted output |
| 336 should be written; desc is 'source' or 'destination'. |
| 337 thread_state: gsutil Cloud API instance to use. |
| 338 """ |
| 339 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) |
| 340 (url_str, out_file_name, desc) = args_tuple |
| 341 # We sort while iterating over url_str, allowing parallelism of batched |
| 342 # sorting with collecting the listing. |
| 343 out_file = io.open(out_file_name, mode='w', encoding=UTF8) |
| 344 _BatchSort(_FieldedListingIterator(cls, gsutil_api, url_str, desc), out_file) |
| 345 out_file.close() |
| 346 |
| 347 |
| 348 def _FieldedListingIterator(cls, gsutil_api, url_str, desc): |
| 349 """Iterator over url_str outputting lines formatted per _BuildTmpOutputLine. |
| 350 |
| 351 Args: |
| 352 cls: Command instance. |
| 353 gsutil_api: gsutil Cloud API instance to use for bucket listing. |
| 354 url_str: The URL string over which to iterate. |
| 355 desc: 'source' or 'destination'. |
| 356 |
| 357 Yields: |
| 358 Output line formatted per _BuildTmpOutputLine. |
| 359 """ |
| 360 if cls.recursion_requested: |
| 361 wildcard = '%s/**' % url_str.rstrip('/\\') |
| 362 else: |
| 363 wildcard = '%s/*' % url_str.rstrip('/\\') |
| 364 i = 0 |
| 365 for blr in CreateWildcardIterator( |
| 366 wildcard, gsutil_api, debug=cls.debug, |
| 367 project_id=cls.project_id).IterObjects( |
| 368 # Request just the needed fields, to reduce bandwidth usage. |
| 369 bucket_listing_fields=['crc32c', 'md5Hash', 'name', 'size']): |
| 370 # Various GUI tools (like the GCS web console) create placeholder objects |
| 371 # ending with '/' when the user creates an empty directory. Normally these |
| 372 # tools should delete those placeholders once objects have been written |
| 373 # "under" the directory, but sometimes the placeholders are left around. |
| 374 # We need to filter them out here, otherwise if the user tries to rsync |
| 375 # from GCS to a local directory it will result in a directory/file |
| 376 # conflict (e.g., trying to download an object called "mydata/" where the |
| 377 # local directory "mydata" exists). |
| 378 url = blr.storage_url |
| 379 if IsCloudSubdirPlaceholder(url, blr=blr): |
| 380 cls.logger.info('Skipping cloud sub-directory placeholder object %s', url) |
| 381 continue |
| 382 if (cls.exclude_symlinks and url.IsFileUrl() |
| 383 and os.path.islink(url.object_name)): |
| 384 continue |
| 385 i += 1 |
| 386 if i % _PROGRESS_REPORT_LISTING_COUNT == 0: |
| 387 cls.logger.info('At %s listing %d...', desc, i) |
| 388 yield _BuildTmpOutputLine(blr) |
| 389 |
| 390 |
| 391 def _BuildTmpOutputLine(blr): |
| 392 """Builds line to output to temp file for given BucketListingRef. |
| 393 |
| 394 Args: |
| 395 blr: The BucketListingRef. |
| 396 |
| 397 Returns: |
| 398 The output line, formatted as quote_plus(URL)<sp>size<sp>crc32c<sp>md5 |
| 399 where crc32c will only be present for GCS URLs, and md5 will only be |
| 400 present for cloud URLs that aren't composite objects. A missing field is |
| 401 populated with '-'. |
| 402 """ |
| 403 crc32c = _NA |
| 404 md5 = _NA |
| 405 url = blr.storage_url |
| 406 if url.IsFileUrl(): |
| 407 size = os.path.getsize(url.object_name) |
| 408 elif url.IsCloudUrl(): |
| 409 size = blr.root_object.size |
| 410 crc32c = blr.root_object.crc32c or _NA |
| 411 md5 = blr.root_object.md5Hash or _NA |
| 412 else: |
| 413 raise CommandException('Got unexpected URL type (%s)' % url.scheme) |
| 414 return '%s %d %s %s\n' % ( |
| 415 urllib.quote_plus(url.url_string.encode(UTF8)), size, crc32c, md5) |
| 416 |
| 417 |
| 418 # pylint: disable=bare-except |
| 419 def _BatchSort(in_iter, out_file): |
| 420 """Sorts input lines from in_iter and outputs to out_file. |
| 421 |
| 422 Sorts in batches as input arrives, so input file does not need to be loaded |
| 423 into memory all at once. Derived from Python Recipe 466302: Sorting big |
| 424 files the Python 2.4 way by Nicolas Lehuen. |
| 425 |
| 426 Sorted format is per _BuildTmpOutputLine. We're sorting on the entire line |
| 427 when we could just sort on the first record (URL); but the sort order is |
| 428 identical either way. |
| 429 |
| 430 Args: |
| 431 in_iter: Input iterator. |
| 432 out_file: Output file. |
| 433 """ |
| 434 # Note: If chunk_files gets very large we can run out of open FDs. See .boto |
| 435 # file comments about rsync_buffer_lines. If increasing rsync_buffer_lines |
| 436 # doesn't suffice (e.g., for someone synchronizing with a really large |
| 437 # bucket), an option would be to make gsutil merge in passes, never |
| 438 # opening all chunk files simultaneously. |
| 439 buffer_size = config.getint('GSUtil', 'rsync_buffer_lines', 32000) |
| 440 chunk_files = [] |
| 441 try: |
| 442 while True: |
| 443 current_chunk = sorted(islice(in_iter, buffer_size)) |
| 444 if not current_chunk: |
| 445 break |
| 446 output_chunk = io.open('%s-%06i' % (out_file.name, len(chunk_files)), |
| 447 mode='w+', encoding=UTF8) |
| 448 chunk_files.append(output_chunk) |
| 449 output_chunk.writelines(unicode(''.join(current_chunk))) |
| 450 output_chunk.flush() |
| 451 output_chunk.seek(0) |
| 452 out_file.writelines(heapq.merge(*chunk_files)) |
| 453 except IOError as e: |
| 454 if e.errno == errno.EMFILE: |
| 455 raise CommandException('\n'.join(textwrap.wrap( |
| 456 'Synchronization failed because too many open file handles were ' |
| 457 'needed while building synchronization state. Please see the ' |
| 458 'comments about rsync_buffer_lines in your .boto config file for a ' |
| 459 'possible way to address this problem.'))) |
| 460 raise |
| 461 finally: |
| 462 for chunk_file in chunk_files: |
| 463 try: |
| 464 chunk_file.close() |
| 465 os.remove(chunk_file.name) |
| 466 except: |
| 467 pass |
| 468 |
| 469 |
| 470 class _DiffIterator(object): |
| 471 """Iterator yielding sequence of _DiffToApply objects.""" |
| 472 |
| 473 def __init__(self, command_obj, base_src_url, base_dst_url): |
| 474 self.command_obj = command_obj |
| 475 self.compute_checksums = command_obj.compute_checksums |
| 476 self.delete_extras = command_obj.delete_extras |
| 477 self.recursion_requested = command_obj.recursion_requested |
| 478 self.logger = self.command_obj.logger |
| 479 self.base_src_url = base_src_url |
| 480 self.base_dst_url = base_dst_url |
| 481 self.logger.info('Building synchronization state...') |
| 482 |
| 483 (src_fh, self.sorted_list_src_file_name) = tempfile.mkstemp( |
| 484 prefix='gsutil-rsync-src-') |
| 485 (dst_fh, self.sorted_list_dst_file_name) = tempfile.mkstemp( |
| 486 prefix='gsutil-rsync-dst-') |
| 487 # Close the file handles; the file will be opened in write mode by |
| 488 # _ListUrlRootFunc. |
| 489 os.close(src_fh) |
| 490 os.close(dst_fh) |
| 491 |
| 492 # Build sorted lists of src and dst URLs in parallel. To do this, pass args |
| 493 # to _ListUrlRootFunc as tuple (url_str, out_file_name, desc). |
| 494 args_iter = iter([ |
| 495 (self.base_src_url.url_string, self.sorted_list_src_file_name, |
| 496 'source'), |
| 497 (self.base_dst_url.url_string, self.sorted_list_dst_file_name, |
| 498 'destination') |
| 499 ]) |
| 500 command_obj.Apply(_ListUrlRootFunc, args_iter, _RootListingExceptionHandler, |
| 501 arg_checker=DummyArgChecker, |
| 502 parallel_operations_override=True, |
| 503 fail_on_error=True) |
| 504 |
| 505 self.sorted_list_src_file = open(self.sorted_list_src_file_name, 'r') |
| 506 self.sorted_list_dst_file = open(self.sorted_list_dst_file_name, 'r') |
| 507 |
| 508 # Wrap iterators in PluralityCheckableIterator so we can check emptiness. |
| 509 self.sorted_src_urls_it = PluralityCheckableIterator( |
| 510 iter(self.sorted_list_src_file)) |
| 511 self.sorted_dst_urls_it = PluralityCheckableIterator( |
| 512 iter(self.sorted_list_dst_file)) |
| 513 |
| 514 # pylint: disable=bare-except |
| 515 def CleanUpTempFiles(self): |
| 516 """Cleans up temp files. |
| 517 |
| 518 This function allows the main (RunCommand) function to clean up at end of |
| 519 operation. This is necessary because tempfile.NamedTemporaryFile doesn't |
| 520 allow the created file to be re-opened in read mode on Windows, so we have |
| 521 to use tempfile.mkstemp, which doesn't automatically delete temp files (see |
| 522 https://mail.python.org/pipermail/python-list/2005-December/336958.html). |
| 523 """ |
| 524 try: |
| 525 self.sorted_list_src_file.close() |
| 526 self.sorted_list_dst_file.close() |
| 527 for fname in (self.sorted_list_src_file_name, |
| 528 self.sorted_list_dst_file_name): |
| 529 os.unlink(fname) |
| 530 except: |
| 531 pass |
| 532 |
| 533 def _ParseTmpFileLine(self, line): |
| 534 """Parses output from _BuildTmpOutputLine. |
| 535 |
| 536 Parses into tuple: |
| 537 (URL, size, crc32c, md5) |
| 538 where crc32c and/or md5 can be _NA. |
| 539 |
| 540 Args: |
| 541 line: The line to parse. |
| 542 |
| 543 Returns: |
| 544 Parsed tuple: (url, size, crc32c, md5) |
| 545 """ |
| 546 (encoded_url, size, crc32c, md5) = line.split() |
| 547 return (urllib.unquote_plus(encoded_url).decode(UTF8), |
| 548 int(size), crc32c, md5.strip()) |
| 549 |
| 550 def _WarnIfMissingCloudHash(self, url_str, crc32c, md5): |
| 551 """Warns if given url_str is a cloud URL and is missing both crc32c and md5. |
| 552 |
| 553 Args: |
| 554 url_str: Destination URL string. |
| 555 crc32c: Destination CRC32c. |
| 556 md5: Destination MD5. |
| 557 |
| 558 Returns: |
| 559 True if issued warning. |
| 560 """ |
| 561 # One known way this can currently happen is when rsync'ing objects larger |
| 562 # than 5GB from S3 (for which the etag is not an MD5). |
| 563 if (StorageUrlFromString(url_str).IsCloudUrl() |
| 564 and crc32c == _NA and md5 == _NA): |
| 565 self.logger.warn( |
| 566 'Found no hashes to validate %s. Integrity cannot be assured without ' |
| 567 'hashes.', url_str) |
| 568 return True |
| 569 return False |
| 570 |
| 571 def _ObjectsMatch(self, src_url_str, src_size, src_crc32c, src_md5, |
| 572 dst_url_str, dst_size, dst_crc32c, dst_md5): |
| 573 """Returns True if src and dst objects are the same. |
| 574 |
| 575 Uses size plus whatever checksums are available. |
| 576 |
| 577 Args: |
| 578 src_url_str: Source URL string. |
| 579 src_size: Source size |
| 580 src_crc32c: Source CRC32c. |
| 581 src_md5: Source MD5. |
| 582 dst_url_str: Destination URL string. |
| 583 dst_size: Destination size |
| 584 dst_crc32c: Destination CRC32c. |
| 585 dst_md5: Destination MD5. |
| 586 |
| 587 Returns: |
| 588 True/False. |
| 589 """ |
| 590 # Note: This function is called from __iter__, which is called from the |
| 591 # Command.Apply driver. Thus, all checksum computation will be run in a |
| 592 # single thread, which is good (having multiple threads concurrently |
| 593 # computing checksums would thrash the disk). |
| 594 if src_size != dst_size: |
| 595 return False |
| 596 if self.compute_checksums: |
| 597 (src_crc32c, src_md5, dst_crc32c, dst_md5) = _ComputeNeededFileChecksums( |
| 598 self.logger, src_url_str, src_size, src_crc32c, src_md5, dst_url_str, |
| 599 dst_size, dst_crc32c, dst_md5) |
| 600 if src_md5 != _NA and dst_md5 != _NA: |
| 601 self.logger.debug('Comparing md5 for %s and %s', src_url_str, dst_url_str) |
| 602 return src_md5 == dst_md5 |
| 603 if src_crc32c != _NA and dst_crc32c != _NA: |
| 604 self.logger.debug( |
| 605 'Comparing crc32c for %s and %s', src_url_str, dst_url_str) |
| 606 return src_crc32c == dst_crc32c |
| 607 if not self._WarnIfMissingCloudHash(src_url_str, src_crc32c, src_md5): |
| 608 self._WarnIfMissingCloudHash(dst_url_str, dst_crc32c, dst_md5) |
| 609 # Without checksums to compare we depend only on basic size comparison. |
| 610 return True |
| 611 |
| 612 def __iter__(self): |
| 613 """Iterates over src/dst URLs and produces a _DiffToApply sequence. |
| 614 |
| 615 Yields: |
| 616 The _DiffToApply. |
| 617 """ |
| 618 # Strip trailing slashes, if any, so we compute tail length against |
| 619 # consistent position regardless of whether trailing slashes were included |
| 620 # or not in URL. |
| 621 base_src_url_len = len(self.base_src_url.url_string.rstrip('/\\')) |
| 622 base_dst_url_len = len(self.base_dst_url.url_string.rstrip('/\\')) |
| 623 src_url_str = dst_url_str = None |
| 624 # Invariant: After each yield, the URLs in src_url_str, dst_url_str, |
| 625 # self.sorted_src_urls_it, and self.sorted_dst_urls_it are not yet |
| 626 # processed. Each time we encounter None in src_url_str or dst_url_str we |
| 627 # populate from the respective iterator, and we reset one or the other value |
| 628 # to None after yielding an action that disposes of that URL. |
| 629 while not self.sorted_src_urls_it.IsEmpty() or src_url_str is not None: |
| 630 if src_url_str is None: |
| 631 (src_url_str, src_size, src_crc32c, src_md5) = self._ParseTmpFileLine( |
| 632 self.sorted_src_urls_it.next()) |
| 633 # Skip past base URL and normalize slashes so we can compare across |
| 634 # clouds/file systems (including Windows). |
| 635 src_url_str_to_check = src_url_str[base_src_url_len:].replace('\\', '/') |
| 636 dst_url_str_would_copy_to = copy_helper.ConstructDstUrl( |
| 637 self.base_src_url, StorageUrlFromString(src_url_str), True, True, |
| 638 self.base_dst_url, False, self.recursion_requested).url_string |
| 639 if self.sorted_dst_urls_it.IsEmpty(): |
| 640 # We've reached end of dst URLs, so copy src to dst. |
| 641 yield _DiffToApply( |
| 642 src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY) |
| 643 src_url_str = None |
| 644 continue |
| 645 if not dst_url_str: |
| 646 (dst_url_str, dst_size, dst_crc32c, dst_md5) = ( |
| 647 self._ParseTmpFileLine(self.sorted_dst_urls_it.next())) |
| 648 # Skip past base URL and normalize slashes so we can compare acros |
| 649 # clouds/file systems (including Windows). |
| 650 dst_url_str_to_check = dst_url_str[base_dst_url_len:].replace('\\', '/') |
| 651 |
| 652 if src_url_str_to_check < dst_url_str_to_check: |
| 653 # There's no dst object corresponding to src object, so copy src to dst. |
| 654 yield _DiffToApply( |
| 655 src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY) |
| 656 src_url_str = None |
| 657 elif src_url_str_to_check > dst_url_str_to_check: |
| 658 # dst object without a corresponding src object, so remove dst if -d |
| 659 # option was specified. |
| 660 if self.delete_extras: |
| 661 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE) |
| 662 dst_url_str = None |
| 663 else: |
| 664 # There is a dst object corresponding to src object, so check if objects |
| 665 # match. |
| 666 if self._ObjectsMatch( |
| 667 src_url_str, src_size, src_crc32c, src_md5, |
| 668 dst_url_str, dst_size, dst_crc32c, dst_md5): |
| 669 # Continue iterating without yielding a _DiffToApply. |
| 670 src_url_str = None |
| 671 dst_url_str = None |
| 672 else: |
| 673 yield _DiffToApply(src_url_str, dst_url_str, _DiffAction.COPY) |
| 674 dst_url_str = None |
| 675 |
| 676 # If -d option specified any files/objects left in dst iteration should be |
| 677 # removed. |
| 678 if not self.delete_extras: |
| 679 return |
| 680 if dst_url_str: |
| 681 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE) |
| 682 dst_url_str = None |
| 683 for line in self.sorted_dst_urls_it: |
| 684 (dst_url_str, _, _, _) = self._ParseTmpFileLine(line) |
| 685 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE) |
| 686 |
| 687 |
| 688 def _RsyncFunc(cls, diff_to_apply, thread_state=None): |
| 689 """Worker function for performing the actual copy and remove operations.""" |
| 690 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) |
| 691 dst_url_str = diff_to_apply.dst_url_str |
| 692 dst_url = StorageUrlFromString(dst_url_str) |
| 693 if diff_to_apply.diff_action == _DiffAction.REMOVE: |
| 694 if cls.dryrun: |
| 695 cls.logger.info('Would remove %s', dst_url) |
| 696 else: |
| 697 cls.logger.info('Removing %s', dst_url) |
| 698 if dst_url.IsFileUrl(): |
| 699 os.unlink(dst_url.object_name) |
| 700 else: |
| 701 try: |
| 702 gsutil_api.DeleteObject( |
| 703 dst_url.bucket_name, dst_url.object_name, |
| 704 generation=dst_url.generation, provider=dst_url.scheme) |
| 705 except NotFoundException: |
| 706 # If the object happened to be deleted by an external process, this |
| 707 # is fine because it moves us closer to the desired state. |
| 708 pass |
| 709 elif diff_to_apply.diff_action == _DiffAction.COPY: |
| 710 src_url_str = diff_to_apply.src_url_str |
| 711 src_url = StorageUrlFromString(src_url_str) |
| 712 if cls.dryrun: |
| 713 cls.logger.info('Would copy %s to %s', src_url, dst_url) |
| 714 else: |
| 715 copy_helper.PerformCopy(cls.logger, src_url, dst_url, gsutil_api, cls, |
| 716 _RsyncExceptionHandler, |
| 717 headers=cls.headers) |
| 718 else: |
| 719 raise CommandException('Got unexpected DiffAction (%d)' |
| 720 % diff_to_apply.diff_action) |
| 721 |
| 722 |
| 723 def _RootListingExceptionHandler(cls, e): |
| 724 """Simple exception handler for exceptions during listing URLs to sync.""" |
| 725 cls.logger.error(str(e)) |
| 726 |
| 727 |
| 728 def _RsyncExceptionHandler(cls, e): |
| 729 """Simple exception handler to allow post-completion status.""" |
| 730 cls.logger.error(str(e)) |
| 731 cls.op_failure_count += 1 |
| 732 cls.logger.debug('\n\nEncountered exception while syncing:\n%s\n', |
| 733 traceback.format_exc()) |
| 734 |
| 735 |
| 736 class RsyncCommand(Command): |
| 737 """Implementation of gsutil rsync command.""" |
| 738 |
| 739 # Command specification. See base class for documentation. |
| 740 command_spec = Command.CreateCommandSpec( |
| 741 'rsync', |
| 742 command_name_aliases=[], |
| 743 min_args=2, |
| 744 max_args=2, |
| 745 supported_sub_args='cCdenprR', |
| 746 file_url_ok=True, |
| 747 provider_url_ok=False, |
| 748 urls_start_arg=0, |
| 749 gs_api_support=[ApiSelector.XML, ApiSelector.JSON], |
| 750 gs_default_api=ApiSelector.JSON, |
| 751 ) |
| 752 # Help specification. See help_provider.py for documentation. |
| 753 help_spec = Command.HelpSpec( |
| 754 help_name='rsync', |
| 755 help_name_aliases=['sync', 'synchronize'], |
| 756 help_type='command_help', |
| 757 help_one_line_summary='Synchronize content of two buckets/directories', |
| 758 help_text=_DETAILED_HELP_TEXT, |
| 759 subcommand_help_text={}, |
| 760 ) |
| 761 total_bytes_transferred = 0 |
| 762 |
| 763 def _InsistContainer(self, url_str): |
| 764 """Sanity checks that URL names an existing container. |
| 765 |
| 766 Args: |
| 767 url_str: URL string to check. |
| 768 |
| 769 Returns: |
| 770 URL for checked string. |
| 771 |
| 772 Raises: |
| 773 CommandException if url_str doesn't name an existing container. |
| 774 """ |
| 775 (url, have_existing_container) = ( |
| 776 copy_helper.ExpandUrlToSingleBlr(url_str, self.gsutil_api, self.debug, |
| 777 self.project_id)) |
| 778 if not have_existing_container: |
| 779 raise CommandException( |
| 780 'arg (%s) does not name a directory, bucket, or bucket subdir.' |
| 781 % url_str) |
| 782 return url |
| 783 |
| 784 def RunCommand(self): |
| 785 """Command entry point for the rsync command.""" |
| 786 self._ParseOpts() |
| 787 if self.compute_checksums and not UsingCrcmodExtension(crcmod): |
| 788 self.logger.warn(SLOW_CRCMOD_WARNING) |
| 789 |
| 790 src_url = self._InsistContainer(self.args[0]) |
| 791 dst_url = self._InsistContainer(self.args[1]) |
| 792 |
| 793 # Tracks if any copy or rm operations failed. |
| 794 self.op_failure_count = 0 |
| 795 |
| 796 # List of attributes to share/manage across multiple processes in |
| 797 # parallel (-m) mode. |
| 798 shared_attrs = ['op_failure_count'] |
| 799 |
| 800 # Perform sync requests in parallel (-m) mode, if requested, using |
| 801 # configured number of parallel processes and threads. Otherwise, |
| 802 # perform requests with sequential function calls in current process. |
| 803 diff_iterator = _DiffIterator(self, src_url, dst_url) |
| 804 self.logger.info('Starting synchronization') |
| 805 try: |
| 806 self.Apply(_RsyncFunc, diff_iterator, _RsyncExceptionHandler, |
| 807 shared_attrs, arg_checker=_DiffToApplyArgChecker, |
| 808 fail_on_error=True) |
| 809 finally: |
| 810 diff_iterator.CleanUpTempFiles() |
| 811 |
| 812 if self.op_failure_count: |
| 813 plural_str = 's' if self.op_failure_count else '' |
| 814 raise CommandException( |
| 815 '%d file%s/object%s could not be copied/removed.' % |
| 816 (self.op_failure_count, plural_str, plural_str)) |
| 817 |
| 818 def _ParseOpts(self): |
| 819 # exclude_symlinks is handled by Command parent class, so save in Command |
| 820 # state rather than CopyHelperOpts. |
| 821 self.exclude_symlinks = False |
| 822 # continue_on_error is handled by Command parent class, so save in Command |
| 823 # state rather than CopyHelperOpts. |
| 824 self.continue_on_error = False |
| 825 self.delete_extras = False |
| 826 preserve_acl = False |
| 827 self.compute_checksums = False |
| 828 self.dryrun = False |
| 829 # self.recursion_requested is initialized in command.py (so it can be |
| 830 # checked in parent class for all commands). |
| 831 |
| 832 if self.sub_opts: |
| 833 for o, _ in self.sub_opts: |
| 834 if o == '-c': |
| 835 self.compute_checksums = True |
| 836 # Note: In gsutil cp command this is specified using -c but here we use |
| 837 # -C so we can use -c for checksum arg (to be consistent with Unix rsync |
| 838 # command options). |
| 839 elif o == '-C': |
| 840 self.continue_on_error = True |
| 841 elif o == '-d': |
| 842 self.delete_extras = True |
| 843 elif o == '-e': |
| 844 self.exclude_symlinks = True |
| 845 elif o == '-n': |
| 846 self.dryrun = True |
| 847 elif o == '-p': |
| 848 preserve_acl = True |
| 849 elif o == '-r' or o == '-R': |
| 850 self.recursion_requested = True |
| 851 return CreateCopyHelperOpts(preserve_acl=preserve_acl) |
| OLD | NEW |