Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(110)

Side by Side Diff: gslib/commands/rsync.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « gslib/commands/rm.py ('k') | gslib/commands/setmeta.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Property Changes:
Added: svn:eol-style
+ LF
OLDNEW
(Empty)
1 # -*- coding: utf-8 -*-
2 # Copyright 2014 Google Inc. All Rights Reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Implementation of Unix-like rsync command."""
16
17 from __future__ import absolute_import
18
19 import errno
20 import heapq
21 import io
22 from itertools import islice
23 import os
24 import tempfile
25 import textwrap
26 import traceback
27 import urllib
28
29 from boto import config
30 import crcmod
31
32 from gslib import copy_helper
33 from gslib.cloud_api import NotFoundException
34 from gslib.command import Command
35 from gslib.command import DummyArgChecker
36 from gslib.copy_helper import CreateCopyHelperOpts
37 from gslib.cs_api_map import ApiSelector
38 from gslib.exception import CommandException
39 from gslib.hashing_helper import CalculateB64EncodedCrc32cFromContents
40 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
41 from gslib.hashing_helper import SLOW_CRCMOD_WARNING
42 from gslib.plurality_checkable_iterator import PluralityCheckableIterator
43 from gslib.storage_url import StorageUrlFromString
44 from gslib.util import GetCloudApiInstance
45 from gslib.util import IsCloudSubdirPlaceholder
46 from gslib.util import TEN_MB
47 from gslib.util import UsingCrcmodExtension
48 from gslib.util import UTF8
49 from gslib.wildcard_iterator import CreateWildcardIterator
50
51
52 _DETAILED_HELP_TEXT = ("""
53 <B>SYNOPSIS</B>
54 gsutil rsync [-c] [-C] [-d] [-e] [-n] [-p] [-R] src_url dst_url
55
56
57 <B>DESCRIPTION</B>
58 The gsutil rsync command makes the contents under dst_url the same as the
59 contents under src_url, by copying any missing files/objects, and (if the
60 -d option is specified) deleting any extra files/objects. For example, to
61 make gs://mybucket/data match the contents of the local directory "data"
62 you could do:
63
64 gsutil rsync -d data gs://mybucket/data
65
66 To recurse into directories use the -r option:
67
68 gsutil rsync -d -r data gs://mybucket/data
69
70 To copy only new/changed files without deleting extra files from
71 gs://mybucket/data leave off the -d option:
72
73 gsutil rsync -r data gs://mybucket/data
74
75 If you have a large number of objects to synchronize you might want to use the
76 gsutil -m option, to perform parallel (multi-threaded/multi-processing)
77 synchronization:
78
79 gsutil -m rsync -d -r data gs://mybucket/data
80
81 The -m option typically will provide a large performance boost if either the
82 source or destination (or both) is a cloud URL. If both source and
83 destination are file URLs the -m option will typically thrash the disk and
84 slow synchronization down.
85
86 To make the local directory "data" the same as the contents of
87 gs://mybucket/data:
88
89 gsutil rsync -d -r gs://mybucket/data data
90
91 To make the contents of gs://mybucket2 the same as gs://mybucket1:
92
93 gsutil rsync -d -r gs://mybucket1 gs://mybucket2
94
95 You can also mirror data across local directories:
96
97 gsutil rsync -d -r dir1 dir2
98
99 To mirror your content across clouds:
100
101 gsutil rsync -d -r gs://my-gs-bucket s3://my-s3-bucket
102
103 Note: If you are synchronizing a large amount of data between clouds you might
104 consider setting up a
105 `Google Compute Engine <https://cloud.google.com/products/compute-engine>`_
106 account and running gsutil there. Since cross-provider gsutil data transfers
107 flow through the machine where gsutil is running, doing this can make your
108 transfer run singificantly faster than running gsutil on your local
109 workstation.
110
111
112 <B>CHECKSUM VALIDATION AND FAILURE HANDLING</B>
113 At the end of every upload or download, the gsutil rsync command validates
114 that the checksum of the source file/object matches the checksum of the
115 destination file/object. If the checksums do not match, gsutil will delete
116 the invalid copy and print a warning message. This very rarely happens, but
117 if it does, please contact gs-team@google.com.
118
119 The rsync command will retry when failures occur, but if enough failures
120 happen during a particular copy or delete operation the command will skip that
121 object and move on. At the end of the synchronization run if any failures were
122 not successfully retried, the rsync command will report the count of failures,
123 and exit with non-zero status. At this point you can run the rsync command
124 again, and it will attempt any remaining needed copy and/or delete operations.
125
126 Note that there are cases where retrying will never succeed, such as if you
127 don't have write permission to the destination bucket or if the destination
128 path for some objects is longer than the maximum allowed length.
129
130 For more details about gsutil's retry handling, please see
131 "gsutil help retries".
132
133
134 <B>CHANGE DETECTION ALGORITHM</B>
135 To determine if a file or object has changed gsutil rsync first checks whether
136 the source and destination sizes match. If they match, it next checks if their
137 checksums match, using whatever checksums are available (see below). Unlike
138 the Unix rsync command, gsutil rsync does not use timestamps to determine if
139 the file/object changed, because the GCS API does not permit the caller to set
140 an object's timestamp (hence, timestamps of identical files/objects cannot be
141 made to match).
142
143 Checksums will not be available in two cases:
144
145 1. When synchronizing to or from a file system. By default, gsutil does not
146 checksum files, because of the slowdown caused when working with large
147 files. You can cause gsutil to checksum files by using the
148 gsutil rsync -c option, at the cost of increased local disk I/O and run
149 time when working with large files.
150
151 2. When comparing composite GCS objects with objects at a cloud provider that
152 does not support CRC32C (which is the only checksum available for composite
153 objects). See 'gsutil help compose' for details about composite objects.
154
155
156 <B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B>
157 If both the source and destination URL are cloud URLs from the same provider,
158 gsutil copies data "in the cloud" (i.e., without downloading to and uploading
159 from the machine where you run gsutil). In addition to the performance and
160 cost advantages of doing this, copying in the cloud preserves metadata (like
161 Content-Type and Cache-Control). In contrast, when you download data from the
162 cloud it ends up in a file, which has no associated metadata. Thus, unless you
163 have some way to hold on to or re-create that metadata, synchronizing a bucket
164 to a directory in the local file system will not retain the metadata.
165
166 Note that by default, the gsutil rsync command does not copy the ACLs of
167 objects being synchronized and instead will use the default bucket ACL (see
168 "gsutil help defacl"). You can override this behavior with the -p option (see
169 OPTIONS below).
170
171
172 <B>SLOW CHECKSUMS</B>
173 If you find that CRC32C checksum computation runs slowly, this is likely
174 because you don't have a compiled CRC32c on your system. Try running:
175
176 gsutil ver -l
177
178 If the output contains:
179
180 compiled crcmod: False
181
182 you are running a Python library for computing CRC32C, which is much slower
183 than using the compiled code. For information on getting a compiled CRC32C
184 implementation, see 'gsutil help crc32c'.
185
186
187 <B>LIMITATIONS</B>
188 1. The gsutil rsync command doesn't make the destination object's timestamps
189 match those of the source object (it can't; timestamp setting is not
190 allowed by the GCS API).
191
192 2. The gsutil rsync command ignores versioning, synchronizing only the live
193 object versions in versioned buckets.
194
195
196 <B>OPTIONS</B>
197 -c Causes the rsync command to compute checksums for files if the
198 size of source and destination match, and then compare
199 checksums. This option increases local disk I/O and run time
200 if either src_url or dst_url are on the local file system.
201
202 -C If an error occurs, continue to attempt to copy the remaining
203 files. If errors occurred, gsutil's exit status will be non-zero
204 even if this flag is set. This option is implicitly set when
205 running "gsutil -m rsync...". Note: -C only applies to the
206 actual copying operation. If an error occurs while iterating
207 over the files in the local directory (e.g., invalid Unicode
208 file name) gsutil will print an error message and abort.
209
210 -d Delete extra files under dst_url not found under src_url. By
211 default extra files are not deleted.
212
213 -e Exclude symlinks. When specified, symbolic links will be
214 ignored.
215
216 -n Causes rsync to run in "dry run" mode, i.e., just outputting
217 what would be copied or deleted without actually doing any
218 copying/deleting.
219
220 -p Causes ACLs to be preserved when synchronizing in the cloud.
221 Note that this option has performance and cost implications when
222 using the XML API, as it requires separate HTTP calls for
223 interacting with ACLs. The performance issue can be mitigated to
224 some degree by using gsutil -m rsync to cause parallel
225 synchronization. Also, this option only works if you have OWNER
226 access to all of the objects that are copied.
227
228 You can avoid the additional performance and cost of using
229 rsync -p if you want all objects in the destination bucket to
230 end up with the same ACL by setting a default object ACL on that
231 bucket instead of using rsync -p. See 'help gsutil defacl'.
232
233 -R, -r Causes directories, buckets, and bucket subdirectories to be
234 synchronized recursively. If you neglect to use this option
235 gsutil will make only the top-level directory in the source
236 and destination URLs match, skipping any sub-directories.
237 """)
238
239
240 class _DiffAction(object):
241 COPY = 'copy'
242 REMOVE = 'remove'
243
244
245 _NA = '-'
246 _OUTPUT_BUFFER_SIZE = 64 * 1024
247 _PROGRESS_REPORT_LISTING_COUNT = 10000
248
249
250 class _DiffToApply(object):
251 """Class that encapsulates info needed to apply diff for one object."""
252
253 def __init__(self, src_url_str, dst_url_str, diff_action):
254 """Constructor.
255
256 Args:
257 src_url_str: The source URL string, or None if diff_action is REMOVE.
258 dst_url_str: The destination URL string.
259 diff_action: _DiffAction to be applied.
260 """
261 self.src_url_str = src_url_str
262 self.dst_url_str = dst_url_str
263 self.diff_action = diff_action
264
265
266 def _DiffToApplyArgChecker(command_instance, diff_to_apply):
267 """Arg checker that skips symlinks if -e flag specified."""
268 if (diff_to_apply.diff_action == _DiffAction.REMOVE
269 or not command_instance.exclude_symlinks):
270 # No src URL is populated for REMOVE actions.
271 return True
272 exp_src_url = StorageUrlFromString(diff_to_apply.src_url_str)
273 if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
274 command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
275 return False
276 return True
277
278
279 def _ComputeNeededFileChecksums(logger, src_url_str, src_size, src_crc32c,
280 src_md5, dst_url_str, dst_size, dst_crc32c,
281 dst_md5):
282 """Computes any file checksums needed by _ObjectsMatch.
283
284 Args:
285 logger: logging.logger for outputting log messages.
286 src_url_str: Source URL string.
287 src_size: Source size
288 src_crc32c: Source CRC32c.
289 src_md5: Source MD5.
290 dst_url_str: Destination URL string.
291 dst_size: Destination size
292 dst_crc32c: Destination CRC32c.
293 dst_md5: Destination MD5.
294
295 Returns:
296 (src_crc32c, src_md5, dst_crc32c, dst_md5)
297 """
298 src_url = StorageUrlFromString(src_url_str)
299 dst_url = StorageUrlFromString(dst_url_str)
300 if src_url.IsFileUrl():
301 if dst_crc32c != _NA or dst_url.IsFileUrl():
302 if src_size > TEN_MB:
303 logger.info('Computing MD5 for %s...', src_url_str)
304 with open(src_url.object_name, 'rb') as fp:
305 src_crc32c = CalculateB64EncodedCrc32cFromContents(fp)
306 elif dst_md5 != _NA or dst_url.IsFileUrl():
307 if dst_size > TEN_MB:
308 logger.info('Computing MD5 for %s...', dst_url_str)
309 with open(src_url.object_name, 'rb') as fp:
310 src_md5 = CalculateB64EncodedMd5FromContents(fp)
311 if dst_url.IsFileUrl():
312 if src_crc32c != _NA:
313 if src_size > TEN_MB:
314 logger.info('Computing CRC32C for %s...', src_url_str)
315 with open(dst_url.object_name, 'rb') as fp:
316 dst_crc32c = CalculateB64EncodedCrc32cFromContents(fp)
317 elif src_md5 != _NA:
318 if dst_size > TEN_MB:
319 logger.info('Computing CRC32C for %s...', dst_url_str)
320 with open(dst_url.object_name, 'rb') as fp:
321 dst_md5 = CalculateB64EncodedMd5FromContents(fp)
322 return (src_crc32c, src_md5, dst_crc32c, dst_md5)
323
324
325 def _ListUrlRootFunc(cls, args_tuple, thread_state=None):
326 """Worker function for listing files/objects under to be sync'd.
327
328 Outputs sorted list to out_file_name, formatted per _BuildTmpOutputLine. We
329 sort the listed URLs because we don't want to depend on consistent sort
330 order across file systems and cloud providers.
331
332 Args:
333 cls: Command instance.
334 args_tuple: (url_str, out_file_name, desc), where url_str is URL string to
335 list; out_file_name is name of file to which sorted output
336 should be written; desc is 'source' or 'destination'.
337 thread_state: gsutil Cloud API instance to use.
338 """
339 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
340 (url_str, out_file_name, desc) = args_tuple
341 # We sort while iterating over url_str, allowing parallelism of batched
342 # sorting with collecting the listing.
343 out_file = io.open(out_file_name, mode='w', encoding=UTF8)
344 _BatchSort(_FieldedListingIterator(cls, gsutil_api, url_str, desc), out_file)
345 out_file.close()
346
347
348 def _FieldedListingIterator(cls, gsutil_api, url_str, desc):
349 """Iterator over url_str outputting lines formatted per _BuildTmpOutputLine.
350
351 Args:
352 cls: Command instance.
353 gsutil_api: gsutil Cloud API instance to use for bucket listing.
354 url_str: The URL string over which to iterate.
355 desc: 'source' or 'destination'.
356
357 Yields:
358 Output line formatted per _BuildTmpOutputLine.
359 """
360 if cls.recursion_requested:
361 wildcard = '%s/**' % url_str.rstrip('/\\')
362 else:
363 wildcard = '%s/*' % url_str.rstrip('/\\')
364 i = 0
365 for blr in CreateWildcardIterator(
366 wildcard, gsutil_api, debug=cls.debug,
367 project_id=cls.project_id).IterObjects(
368 # Request just the needed fields, to reduce bandwidth usage.
369 bucket_listing_fields=['crc32c', 'md5Hash', 'name', 'size']):
370 # Various GUI tools (like the GCS web console) create placeholder objects
371 # ending with '/' when the user creates an empty directory. Normally these
372 # tools should delete those placeholders once objects have been written
373 # "under" the directory, but sometimes the placeholders are left around.
374 # We need to filter them out here, otherwise if the user tries to rsync
375 # from GCS to a local directory it will result in a directory/file
376 # conflict (e.g., trying to download an object called "mydata/" where the
377 # local directory "mydata" exists).
378 url = blr.storage_url
379 if IsCloudSubdirPlaceholder(url, blr=blr):
380 cls.logger.info('Skipping cloud sub-directory placeholder object %s', url)
381 continue
382 if (cls.exclude_symlinks and url.IsFileUrl()
383 and os.path.islink(url.object_name)):
384 continue
385 i += 1
386 if i % _PROGRESS_REPORT_LISTING_COUNT == 0:
387 cls.logger.info('At %s listing %d...', desc, i)
388 yield _BuildTmpOutputLine(blr)
389
390
391 def _BuildTmpOutputLine(blr):
392 """Builds line to output to temp file for given BucketListingRef.
393
394 Args:
395 blr: The BucketListingRef.
396
397 Returns:
398 The output line, formatted as quote_plus(URL)<sp>size<sp>crc32c<sp>md5
399 where crc32c will only be present for GCS URLs, and md5 will only be
400 present for cloud URLs that aren't composite objects. A missing field is
401 populated with '-'.
402 """
403 crc32c = _NA
404 md5 = _NA
405 url = blr.storage_url
406 if url.IsFileUrl():
407 size = os.path.getsize(url.object_name)
408 elif url.IsCloudUrl():
409 size = blr.root_object.size
410 crc32c = blr.root_object.crc32c or _NA
411 md5 = blr.root_object.md5Hash or _NA
412 else:
413 raise CommandException('Got unexpected URL type (%s)' % url.scheme)
414 return '%s %d %s %s\n' % (
415 urllib.quote_plus(url.url_string.encode(UTF8)), size, crc32c, md5)
416
417
418 # pylint: disable=bare-except
419 def _BatchSort(in_iter, out_file):
420 """Sorts input lines from in_iter and outputs to out_file.
421
422 Sorts in batches as input arrives, so input file does not need to be loaded
423 into memory all at once. Derived from Python Recipe 466302: Sorting big
424 files the Python 2.4 way by Nicolas Lehuen.
425
426 Sorted format is per _BuildTmpOutputLine. We're sorting on the entire line
427 when we could just sort on the first record (URL); but the sort order is
428 identical either way.
429
430 Args:
431 in_iter: Input iterator.
432 out_file: Output file.
433 """
434 # Note: If chunk_files gets very large we can run out of open FDs. See .boto
435 # file comments about rsync_buffer_lines. If increasing rsync_buffer_lines
436 # doesn't suffice (e.g., for someone synchronizing with a really large
437 # bucket), an option would be to make gsutil merge in passes, never
438 # opening all chunk files simultaneously.
439 buffer_size = config.getint('GSUtil', 'rsync_buffer_lines', 32000)
440 chunk_files = []
441 try:
442 while True:
443 current_chunk = sorted(islice(in_iter, buffer_size))
444 if not current_chunk:
445 break
446 output_chunk = io.open('%s-%06i' % (out_file.name, len(chunk_files)),
447 mode='w+', encoding=UTF8)
448 chunk_files.append(output_chunk)
449 output_chunk.writelines(unicode(''.join(current_chunk)))
450 output_chunk.flush()
451 output_chunk.seek(0)
452 out_file.writelines(heapq.merge(*chunk_files))
453 except IOError as e:
454 if e.errno == errno.EMFILE:
455 raise CommandException('\n'.join(textwrap.wrap(
456 'Synchronization failed because too many open file handles were '
457 'needed while building synchronization state. Please see the '
458 'comments about rsync_buffer_lines in your .boto config file for a '
459 'possible way to address this problem.')))
460 raise
461 finally:
462 for chunk_file in chunk_files:
463 try:
464 chunk_file.close()
465 os.remove(chunk_file.name)
466 except:
467 pass
468
469
470 class _DiffIterator(object):
471 """Iterator yielding sequence of _DiffToApply objects."""
472
473 def __init__(self, command_obj, base_src_url, base_dst_url):
474 self.command_obj = command_obj
475 self.compute_checksums = command_obj.compute_checksums
476 self.delete_extras = command_obj.delete_extras
477 self.recursion_requested = command_obj.recursion_requested
478 self.logger = self.command_obj.logger
479 self.base_src_url = base_src_url
480 self.base_dst_url = base_dst_url
481 self.logger.info('Building synchronization state...')
482
483 (src_fh, self.sorted_list_src_file_name) = tempfile.mkstemp(
484 prefix='gsutil-rsync-src-')
485 (dst_fh, self.sorted_list_dst_file_name) = tempfile.mkstemp(
486 prefix='gsutil-rsync-dst-')
487 # Close the file handles; the file will be opened in write mode by
488 # _ListUrlRootFunc.
489 os.close(src_fh)
490 os.close(dst_fh)
491
492 # Build sorted lists of src and dst URLs in parallel. To do this, pass args
493 # to _ListUrlRootFunc as tuple (url_str, out_file_name, desc).
494 args_iter = iter([
495 (self.base_src_url.url_string, self.sorted_list_src_file_name,
496 'source'),
497 (self.base_dst_url.url_string, self.sorted_list_dst_file_name,
498 'destination')
499 ])
500 command_obj.Apply(_ListUrlRootFunc, args_iter, _RootListingExceptionHandler,
501 arg_checker=DummyArgChecker,
502 parallel_operations_override=True,
503 fail_on_error=True)
504
505 self.sorted_list_src_file = open(self.sorted_list_src_file_name, 'r')
506 self.sorted_list_dst_file = open(self.sorted_list_dst_file_name, 'r')
507
508 # Wrap iterators in PluralityCheckableIterator so we can check emptiness.
509 self.sorted_src_urls_it = PluralityCheckableIterator(
510 iter(self.sorted_list_src_file))
511 self.sorted_dst_urls_it = PluralityCheckableIterator(
512 iter(self.sorted_list_dst_file))
513
514 # pylint: disable=bare-except
515 def CleanUpTempFiles(self):
516 """Cleans up temp files.
517
518 This function allows the main (RunCommand) function to clean up at end of
519 operation. This is necessary because tempfile.NamedTemporaryFile doesn't
520 allow the created file to be re-opened in read mode on Windows, so we have
521 to use tempfile.mkstemp, which doesn't automatically delete temp files (see
522 https://mail.python.org/pipermail/python-list/2005-December/336958.html).
523 """
524 try:
525 self.sorted_list_src_file.close()
526 self.sorted_list_dst_file.close()
527 for fname in (self.sorted_list_src_file_name,
528 self.sorted_list_dst_file_name):
529 os.unlink(fname)
530 except:
531 pass
532
533 def _ParseTmpFileLine(self, line):
534 """Parses output from _BuildTmpOutputLine.
535
536 Parses into tuple:
537 (URL, size, crc32c, md5)
538 where crc32c and/or md5 can be _NA.
539
540 Args:
541 line: The line to parse.
542
543 Returns:
544 Parsed tuple: (url, size, crc32c, md5)
545 """
546 (encoded_url, size, crc32c, md5) = line.split()
547 return (urllib.unquote_plus(encoded_url).decode(UTF8),
548 int(size), crc32c, md5.strip())
549
550 def _WarnIfMissingCloudHash(self, url_str, crc32c, md5):
551 """Warns if given url_str is a cloud URL and is missing both crc32c and md5.
552
553 Args:
554 url_str: Destination URL string.
555 crc32c: Destination CRC32c.
556 md5: Destination MD5.
557
558 Returns:
559 True if issued warning.
560 """
561 # One known way this can currently happen is when rsync'ing objects larger
562 # than 5GB from S3 (for which the etag is not an MD5).
563 if (StorageUrlFromString(url_str).IsCloudUrl()
564 and crc32c == _NA and md5 == _NA):
565 self.logger.warn(
566 'Found no hashes to validate %s. Integrity cannot be assured without '
567 'hashes.', url_str)
568 return True
569 return False
570
571 def _ObjectsMatch(self, src_url_str, src_size, src_crc32c, src_md5,
572 dst_url_str, dst_size, dst_crc32c, dst_md5):
573 """Returns True if src and dst objects are the same.
574
575 Uses size plus whatever checksums are available.
576
577 Args:
578 src_url_str: Source URL string.
579 src_size: Source size
580 src_crc32c: Source CRC32c.
581 src_md5: Source MD5.
582 dst_url_str: Destination URL string.
583 dst_size: Destination size
584 dst_crc32c: Destination CRC32c.
585 dst_md5: Destination MD5.
586
587 Returns:
588 True/False.
589 """
590 # Note: This function is called from __iter__, which is called from the
591 # Command.Apply driver. Thus, all checksum computation will be run in a
592 # single thread, which is good (having multiple threads concurrently
593 # computing checksums would thrash the disk).
594 if src_size != dst_size:
595 return False
596 if self.compute_checksums:
597 (src_crc32c, src_md5, dst_crc32c, dst_md5) = _ComputeNeededFileChecksums(
598 self.logger, src_url_str, src_size, src_crc32c, src_md5, dst_url_str,
599 dst_size, dst_crc32c, dst_md5)
600 if src_md5 != _NA and dst_md5 != _NA:
601 self.logger.debug('Comparing md5 for %s and %s', src_url_str, dst_url_str)
602 return src_md5 == dst_md5
603 if src_crc32c != _NA and dst_crc32c != _NA:
604 self.logger.debug(
605 'Comparing crc32c for %s and %s', src_url_str, dst_url_str)
606 return src_crc32c == dst_crc32c
607 if not self._WarnIfMissingCloudHash(src_url_str, src_crc32c, src_md5):
608 self._WarnIfMissingCloudHash(dst_url_str, dst_crc32c, dst_md5)
609 # Without checksums to compare we depend only on basic size comparison.
610 return True
611
612 def __iter__(self):
613 """Iterates over src/dst URLs and produces a _DiffToApply sequence.
614
615 Yields:
616 The _DiffToApply.
617 """
618 # Strip trailing slashes, if any, so we compute tail length against
619 # consistent position regardless of whether trailing slashes were included
620 # or not in URL.
621 base_src_url_len = len(self.base_src_url.url_string.rstrip('/\\'))
622 base_dst_url_len = len(self.base_dst_url.url_string.rstrip('/\\'))
623 src_url_str = dst_url_str = None
624 # Invariant: After each yield, the URLs in src_url_str, dst_url_str,
625 # self.sorted_src_urls_it, and self.sorted_dst_urls_it are not yet
626 # processed. Each time we encounter None in src_url_str or dst_url_str we
627 # populate from the respective iterator, and we reset one or the other value
628 # to None after yielding an action that disposes of that URL.
629 while not self.sorted_src_urls_it.IsEmpty() or src_url_str is not None:
630 if src_url_str is None:
631 (src_url_str, src_size, src_crc32c, src_md5) = self._ParseTmpFileLine(
632 self.sorted_src_urls_it.next())
633 # Skip past base URL and normalize slashes so we can compare across
634 # clouds/file systems (including Windows).
635 src_url_str_to_check = src_url_str[base_src_url_len:].replace('\\', '/')
636 dst_url_str_would_copy_to = copy_helper.ConstructDstUrl(
637 self.base_src_url, StorageUrlFromString(src_url_str), True, True,
638 self.base_dst_url, False, self.recursion_requested).url_string
639 if self.sorted_dst_urls_it.IsEmpty():
640 # We've reached end of dst URLs, so copy src to dst.
641 yield _DiffToApply(
642 src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY)
643 src_url_str = None
644 continue
645 if not dst_url_str:
646 (dst_url_str, dst_size, dst_crc32c, dst_md5) = (
647 self._ParseTmpFileLine(self.sorted_dst_urls_it.next()))
648 # Skip past base URL and normalize slashes so we can compare acros
649 # clouds/file systems (including Windows).
650 dst_url_str_to_check = dst_url_str[base_dst_url_len:].replace('\\', '/')
651
652 if src_url_str_to_check < dst_url_str_to_check:
653 # There's no dst object corresponding to src object, so copy src to dst.
654 yield _DiffToApply(
655 src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY)
656 src_url_str = None
657 elif src_url_str_to_check > dst_url_str_to_check:
658 # dst object without a corresponding src object, so remove dst if -d
659 # option was specified.
660 if self.delete_extras:
661 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
662 dst_url_str = None
663 else:
664 # There is a dst object corresponding to src object, so check if objects
665 # match.
666 if self._ObjectsMatch(
667 src_url_str, src_size, src_crc32c, src_md5,
668 dst_url_str, dst_size, dst_crc32c, dst_md5):
669 # Continue iterating without yielding a _DiffToApply.
670 src_url_str = None
671 dst_url_str = None
672 else:
673 yield _DiffToApply(src_url_str, dst_url_str, _DiffAction.COPY)
674 dst_url_str = None
675
676 # If -d option specified any files/objects left in dst iteration should be
677 # removed.
678 if not self.delete_extras:
679 return
680 if dst_url_str:
681 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
682 dst_url_str = None
683 for line in self.sorted_dst_urls_it:
684 (dst_url_str, _, _, _) = self._ParseTmpFileLine(line)
685 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
686
687
688 def _RsyncFunc(cls, diff_to_apply, thread_state=None):
689 """Worker function for performing the actual copy and remove operations."""
690 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
691 dst_url_str = diff_to_apply.dst_url_str
692 dst_url = StorageUrlFromString(dst_url_str)
693 if diff_to_apply.diff_action == _DiffAction.REMOVE:
694 if cls.dryrun:
695 cls.logger.info('Would remove %s', dst_url)
696 else:
697 cls.logger.info('Removing %s', dst_url)
698 if dst_url.IsFileUrl():
699 os.unlink(dst_url.object_name)
700 else:
701 try:
702 gsutil_api.DeleteObject(
703 dst_url.bucket_name, dst_url.object_name,
704 generation=dst_url.generation, provider=dst_url.scheme)
705 except NotFoundException:
706 # If the object happened to be deleted by an external process, this
707 # is fine because it moves us closer to the desired state.
708 pass
709 elif diff_to_apply.diff_action == _DiffAction.COPY:
710 src_url_str = diff_to_apply.src_url_str
711 src_url = StorageUrlFromString(src_url_str)
712 if cls.dryrun:
713 cls.logger.info('Would copy %s to %s', src_url, dst_url)
714 else:
715 copy_helper.PerformCopy(cls.logger, src_url, dst_url, gsutil_api, cls,
716 _RsyncExceptionHandler,
717 headers=cls.headers)
718 else:
719 raise CommandException('Got unexpected DiffAction (%d)'
720 % diff_to_apply.diff_action)
721
722
723 def _RootListingExceptionHandler(cls, e):
724 """Simple exception handler for exceptions during listing URLs to sync."""
725 cls.logger.error(str(e))
726
727
728 def _RsyncExceptionHandler(cls, e):
729 """Simple exception handler to allow post-completion status."""
730 cls.logger.error(str(e))
731 cls.op_failure_count += 1
732 cls.logger.debug('\n\nEncountered exception while syncing:\n%s\n',
733 traceback.format_exc())
734
735
736 class RsyncCommand(Command):
737 """Implementation of gsutil rsync command."""
738
739 # Command specification. See base class for documentation.
740 command_spec = Command.CreateCommandSpec(
741 'rsync',
742 command_name_aliases=[],
743 min_args=2,
744 max_args=2,
745 supported_sub_args='cCdenprR',
746 file_url_ok=True,
747 provider_url_ok=False,
748 urls_start_arg=0,
749 gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
750 gs_default_api=ApiSelector.JSON,
751 )
752 # Help specification. See help_provider.py for documentation.
753 help_spec = Command.HelpSpec(
754 help_name='rsync',
755 help_name_aliases=['sync', 'synchronize'],
756 help_type='command_help',
757 help_one_line_summary='Synchronize content of two buckets/directories',
758 help_text=_DETAILED_HELP_TEXT,
759 subcommand_help_text={},
760 )
761 total_bytes_transferred = 0
762
763 def _InsistContainer(self, url_str):
764 """Sanity checks that URL names an existing container.
765
766 Args:
767 url_str: URL string to check.
768
769 Returns:
770 URL for checked string.
771
772 Raises:
773 CommandException if url_str doesn't name an existing container.
774 """
775 (url, have_existing_container) = (
776 copy_helper.ExpandUrlToSingleBlr(url_str, self.gsutil_api, self.debug,
777 self.project_id))
778 if not have_existing_container:
779 raise CommandException(
780 'arg (%s) does not name a directory, bucket, or bucket subdir.'
781 % url_str)
782 return url
783
784 def RunCommand(self):
785 """Command entry point for the rsync command."""
786 self._ParseOpts()
787 if self.compute_checksums and not UsingCrcmodExtension(crcmod):
788 self.logger.warn(SLOW_CRCMOD_WARNING)
789
790 src_url = self._InsistContainer(self.args[0])
791 dst_url = self._InsistContainer(self.args[1])
792
793 # Tracks if any copy or rm operations failed.
794 self.op_failure_count = 0
795
796 # List of attributes to share/manage across multiple processes in
797 # parallel (-m) mode.
798 shared_attrs = ['op_failure_count']
799
800 # Perform sync requests in parallel (-m) mode, if requested, using
801 # configured number of parallel processes and threads. Otherwise,
802 # perform requests with sequential function calls in current process.
803 diff_iterator = _DiffIterator(self, src_url, dst_url)
804 self.logger.info('Starting synchronization')
805 try:
806 self.Apply(_RsyncFunc, diff_iterator, _RsyncExceptionHandler,
807 shared_attrs, arg_checker=_DiffToApplyArgChecker,
808 fail_on_error=True)
809 finally:
810 diff_iterator.CleanUpTempFiles()
811
812 if self.op_failure_count:
813 plural_str = 's' if self.op_failure_count else ''
814 raise CommandException(
815 '%d file%s/object%s could not be copied/removed.' %
816 (self.op_failure_count, plural_str, plural_str))
817
818 def _ParseOpts(self):
819 # exclude_symlinks is handled by Command parent class, so save in Command
820 # state rather than CopyHelperOpts.
821 self.exclude_symlinks = False
822 # continue_on_error is handled by Command parent class, so save in Command
823 # state rather than CopyHelperOpts.
824 self.continue_on_error = False
825 self.delete_extras = False
826 preserve_acl = False
827 self.compute_checksums = False
828 self.dryrun = False
829 # self.recursion_requested is initialized in command.py (so it can be
830 # checked in parent class for all commands).
831
832 if self.sub_opts:
833 for o, _ in self.sub_opts:
834 if o == '-c':
835 self.compute_checksums = True
836 # Note: In gsutil cp command this is specified using -c but here we use
837 # -C so we can use -c for checksum arg (to be consistent with Unix rsync
838 # command options).
839 elif o == '-C':
840 self.continue_on_error = True
841 elif o == '-d':
842 self.delete_extras = True
843 elif o == '-e':
844 self.exclude_symlinks = True
845 elif o == '-n':
846 self.dryrun = True
847 elif o == '-p':
848 preserve_acl = True
849 elif o == '-r' or o == '-R':
850 self.recursion_requested = True
851 return CreateCopyHelperOpts(preserve_acl=preserve_acl)
OLDNEW
« no previous file with comments | « gslib/commands/rm.py ('k') | gslib/commands/setmeta.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698