OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2014 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """Implementation of Unix-like rsync command.""" | |
16 | |
17 from __future__ import absolute_import | |
18 | |
19 import errno | |
20 import heapq | |
21 import io | |
22 from itertools import islice | |
23 import os | |
24 import re | |
25 import tempfile | |
26 import textwrap | |
27 import traceback | |
28 import urllib | |
29 | |
30 from boto import config | |
31 import crcmod | |
32 | |
33 from gslib import copy_helper | |
34 from gslib.cloud_api import NotFoundException | |
35 from gslib.command import Command | |
36 from gslib.command import DummyArgChecker | |
37 from gslib.command_argument import CommandArgument | |
38 from gslib.copy_helper import CreateCopyHelperOpts | |
39 from gslib.copy_helper import SkipUnsupportedObjectError | |
40 from gslib.cs_api_map import ApiSelector | |
41 from gslib.exception import CommandException | |
42 from gslib.hashing_helper import CalculateB64EncodedCrc32cFromContents | |
43 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents | |
44 from gslib.hashing_helper import SLOW_CRCMOD_WARNING | |
45 from gslib.plurality_checkable_iterator import PluralityCheckableIterator | |
46 from gslib.sig_handling import GetCaughtSignals | |
47 from gslib.sig_handling import RegisterSignalHandler | |
48 from gslib.storage_url import StorageUrlFromString | |
49 from gslib.util import GetCloudApiInstance | |
50 from gslib.util import IsCloudSubdirPlaceholder | |
51 from gslib.util import TEN_MIB | |
52 from gslib.util import UsingCrcmodExtension | |
53 from gslib.util import UTF8 | |
54 from gslib.wildcard_iterator import CreateWildcardIterator | |
55 | |
56 | |
57 _SYNOPSIS = """ | |
58 gsutil rsync [-c] [-C] [-d] [-e] [-n] [-p] [-r] [-U] [-x] src_url dst_url | |
59 """ | |
60 | |
61 _DETAILED_HELP_TEXT = (""" | |
62 <B>SYNOPSIS</B> | |
63 """ + _SYNOPSIS + """ | |
64 | |
65 | |
66 <B>DESCRIPTION</B> | |
67 The gsutil rsync command makes the contents under dst_url the same as the | |
68 contents under src_url, by copying any missing files/objects, and (if the | |
69 -d option is specified) deleting any extra files/objects. For example, to | |
70 make gs://mybucket/data match the contents of the local directory "data" | |
71 you could do: | |
72 | |
73 gsutil rsync -d data gs://mybucket/data | |
74 | |
75 To recurse into directories use the -r option: | |
76 | |
77 gsutil rsync -d -r data gs://mybucket/data | |
78 | |
79 To copy only new/changed files without deleting extra files from | |
80 gs://mybucket/data leave off the -d option: | |
81 | |
82 gsutil rsync -r data gs://mybucket/data | |
83 | |
84 If you have a large number of objects to synchronize you might want to use the | |
85 gsutil -m option, to perform parallel (multi-threaded/multi-processing) | |
86 synchronization: | |
87 | |
88 gsutil -m rsync -d -r data gs://mybucket/data | |
89 | |
90 The -m option typically will provide a large performance boost if either the | |
91 source or destination (or both) is a cloud URL. If both source and | |
92 destination are file URLs the -m option will typically thrash the disk and | |
93 slow synchronization down. | |
94 | |
95 To make the local directory "data" the same as the contents of | |
96 gs://mybucket/data: | |
97 | |
98 gsutil rsync -d -r gs://mybucket/data data | |
99 | |
100 To make the contents of gs://mybucket2 the same as gs://mybucket1: | |
101 | |
102 gsutil rsync -d -r gs://mybucket1 gs://mybucket2 | |
103 | |
104 You can also mirror data across local directories: | |
105 | |
106 gsutil rsync -d -r dir1 dir2 | |
107 | |
108 To mirror your content across clouds: | |
109 | |
110 gsutil rsync -d -r gs://my-gs-bucket s3://my-s3-bucket | |
111 | |
112 Note: If you are synchronizing a large amount of data between clouds you might | |
113 consider setting up a | |
114 `Google Compute Engine <https://cloud.google.com/products/compute-engine>`_ | |
115 account and running gsutil there. Since cross-provider gsutil data transfers | |
116 flow through the machine where gsutil is running, doing this can make your | |
117 transfer run significantly faster than running gsutil on your local | |
118 workstation. | |
119 | |
120 | |
121 <B>BE CAREFUL WHEN USING -d OPTION!</B> | |
122 The rsync -d option is very useful and commonly used, because it provides a | |
123 means of making the contents of a destination bucket or directory match those | |
124 of a source bucket or directory. However, please exercise caution when you | |
125 use this option: It's possible to delete large amounts of data accidentally | |
126 if, for example, you erroneously reverse source and destination. For example, | |
127 if you meant to synchronize a local directory from a bucket in the cloud but | |
128 instead run the command: | |
129 | |
130 gsutil -m rsync -r -d ./your-dir gs://your-bucket | |
131 | |
132 and your-dir is currently empty, you will quickly delete all of the objects in | |
133 gs://your-bucket. | |
134 | |
135 You can also cause large amounts of data to be lost quickly by specifying a | |
136 subdirectory of the destination as the source of an rsync. For example, the | |
137 command: | |
138 | |
139 gsutil -m rsync -r -d gs://your-bucket/data gs://your-bucket | |
140 | |
141 would cause most or all of the objects in gs://your-bucket to be deleted | |
142 (some objects may survive if there are any with names that sort lower than | |
143 "data" under gs://your-bucket/data). | |
144 | |
145 In addition to paying careful attention to the source and destination you | |
146 specify with the rsync command, there are two more safety measures your can | |
147 take when using gsutil rsync -d: | |
148 | |
149 1. Try running the command with the rsync -n option first, to see what it | |
150 would do without actually performing the operations. For example, if | |
151 you run the command: | |
152 | |
153 gsutil -m rsync -r -d -n gs://your-bucket/data gs://your-bucket | |
154 | |
155 it will be immediately evident that running that command without the -n | |
156 option would cause many objects to be deleted. | |
157 | |
158 2. Enable object versioning in your bucket, which will allow you to restore | |
159 objects if you accidentally delete them. For more details see | |
160 "gsutil help versions". | |
161 | |
162 | |
163 <B>IMPACT OF BUCKET LISTING EVENTUAL CONSISTENCY</B> | |
164 The rsync command operates by listing the source and destination URLs, and | |
165 then performing copy and remove operations according to the differences | |
166 between these listings. Because bucket listing is eventually (not strongly) | |
167 consistent, if you upload new objects or delete objects from a bucket and then | |
168 immediately run gsutil rsync with that bucket as the source or destination, | |
169 it's possible the rsync command will not see the recent updates and thus | |
170 synchronize incorrectly. You can rerun the rsync operation again later to | |
171 correct the incorrect synchronization. | |
172 | |
173 | |
174 <B>CHECKSUM VALIDATION AND FAILURE HANDLING</B> | |
175 At the end of every upload or download, the gsutil rsync command validates | |
176 that the checksum of the source file/object matches the checksum of the | |
177 destination file/object. If the checksums do not match, gsutil will delete | |
178 the invalid copy and print a warning message. This very rarely happens, but | |
179 if it does, please contact gs-team@google.com. | |
180 | |
181 The rsync command will retry when failures occur, but if enough failures | |
182 happen during a particular copy or delete operation the command will skip that | |
183 object and move on. At the end of the synchronization run if any failures were | |
184 not successfully retried, the rsync command will report the count of failures, | |
185 and exit with non-zero status. At this point you can run the rsync command | |
186 again, and it will attempt any remaining needed copy and/or delete operations. | |
187 | |
188 Note that there are cases where retrying will never succeed, such as if you | |
189 don't have write permission to the destination bucket or if the destination | |
190 path for some objects is longer than the maximum allowed length. | |
191 | |
192 For more details about gsutil's retry handling, please see | |
193 "gsutil help retries". | |
194 | |
195 | |
196 <B>CHANGE DETECTION ALGORITHM</B> | |
197 To determine if a file or object has changed gsutil rsync first checks whether | |
198 the source and destination sizes match. If they match, it next checks if their | |
199 checksums match, using checksums if available (see below). Unlike the Unix | |
200 rsync command, gsutil rsync does not use timestamps to determine if the | |
201 file/object changed, because the GCS API does not permit the caller to set an | |
202 object's timestamp (hence, timestamps of identical files/objects cannot be | |
203 made to match). | |
204 | |
205 Checksums will not be available in two cases: | |
206 | |
207 1. When synchronizing to or from a file system. By default, gsutil does not | |
208 checksum files, because of the slowdown caused when working with large | |
209 files. You can cause gsutil to checksum files by using the gsutil rsync -c | |
210 option, at the cost of increased local disk I/O and run time when working | |
211 with large files. You should consider using the -c option if your files can | |
212 change without changing sizes (e.g., if you have files that contain fixed | |
213 width data, such as timestamps). | |
214 | |
215 2. When comparing composite GCS objects with objects at a cloud provider that | |
216 does not support CRC32C (which is the only checksum available for composite | |
217 objects). See 'gsutil help compose' for details about composite objects. | |
218 | |
219 | |
220 <B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B> | |
221 If both the source and destination URL are cloud URLs from the same provider, | |
222 gsutil copies data "in the cloud" (i.e., without downloading to and uploading | |
223 from the machine where you run gsutil). In addition to the performance and | |
224 cost advantages of doing this, copying in the cloud preserves metadata (like | |
225 Content-Type and Cache-Control). In contrast, when you download data from the | |
226 cloud it ends up in a file, which has no associated metadata. Thus, unless you | |
227 have some way to hold on to or re-create that metadata, synchronizing a bucket | |
228 to a directory in the local file system will not retain the metadata. | |
229 | |
230 Note that by default, the gsutil rsync command does not copy the ACLs of | |
231 objects being synchronized and instead will use the default bucket ACL (see | |
232 "gsutil help defacl"). You can override this behavior with the -p option (see | |
233 OPTIONS below). | |
234 | |
235 | |
236 <B>SLOW CHECKSUMS</B> | |
237 If you find that CRC32C checksum computation runs slowly, this is likely | |
238 because you don't have a compiled CRC32c on your system. Try running: | |
239 | |
240 gsutil ver -l | |
241 | |
242 If the output contains: | |
243 | |
244 compiled crcmod: False | |
245 | |
246 you are running a Python library for computing CRC32C, which is much slower | |
247 than using the compiled code. For information on getting a compiled CRC32C | |
248 implementation, see 'gsutil help crc32c'. | |
249 | |
250 | |
251 <B>LIMITATIONS</B> | |
252 1. The gsutil rsync command doesn't make the destination object's timestamps | |
253 match those of the source object (it can't; timestamp setting is not | |
254 allowed by the GCS API). | |
255 | |
256 2. The gsutil rsync command ignores versioning, synchronizing only the live | |
257 object versions in versioned buckets. | |
258 | |
259 | |
260 <B>OPTIONS</B> | |
261 -c Causes the rsync command to compute checksums for files if the | |
262 size of source and destination match, and then compare | |
263 checksums. This option increases local disk I/O and run time | |
264 if either src_url or dst_url are on the local file system. | |
265 | |
266 -C If an error occurs, continue to attempt to copy the remaining | |
267 files. If errors occurred, gsutil's exit status will be non-zero | |
268 even if this flag is set. This option is implicitly set when | |
269 running "gsutil -m rsync...". Note: -C only applies to the | |
270 actual copying operation. If an error occurs while iterating | |
271 over the files in the local directory (e.g., invalid Unicode | |
272 file name) gsutil will print an error message and abort. | |
273 | |
274 -d Delete extra files under dst_url not found under src_url. By | |
275 default extra files are not deleted. Note: this option can | |
276 delete data quickly if you specify the wrong source/destination | |
277 combination. See the help section above, | |
278 "BE CAREFUL WHEN USING -d OPTION!". | |
279 | |
280 -e Exclude symlinks. When specified, symbolic links will be | |
281 ignored. | |
282 | |
283 -n Causes rsync to run in "dry run" mode, i.e., just outputting | |
284 what would be copied or deleted without actually doing any | |
285 copying/deleting. | |
286 | |
287 -p Causes ACLs to be preserved when synchronizing in the cloud. | |
288 Note that this option has performance and cost implications when | |
289 using the XML API, as it requires separate HTTP calls for | |
290 interacting with ACLs. The performance issue can be mitigated to | |
291 some degree by using gsutil -m rsync to cause parallel | |
292 synchronization. Also, this option only works if you have OWNER | |
293 access to all of the objects that are copied. | |
294 | |
295 You can avoid the additional performance and cost of using | |
296 rsync -p if you want all objects in the destination bucket to | |
297 end up with the same ACL by setting a default object ACL on that | |
298 bucket instead of using rsync -p. See 'help gsutil defacl'. | |
299 | |
300 -R, -r Causes directories, buckets, and bucket subdirectories to be | |
301 synchronized recursively. If you neglect to use this option | |
302 gsutil will make only the top-level directory in the source | |
303 and destination URLs match, skipping any sub-directories. | |
304 | |
305 -U Skip objects with unsupported object types instead of failing. | |
306 Unsupported object types are s3 glacier objects. | |
307 | |
308 -x pattern Causes files/objects matching pattern to be excluded, i.e., any | |
309 matching files/objects will not be copied or deleted. Note that | |
310 the pattern is a Python regular expression, not a wildcard (so, | |
311 matching any string ending in 'abc' would be specified using | |
312 '.*abc' rather than '*abc'). Note also that the exclude path is | |
313 always relative (similar to Unix rsync or tar exclude options). | |
314 For example, if you run the command: | |
315 | |
316 gsutil rsync -x 'data./.*\\.txt' dir gs://my-bucket | |
317 | |
318 it will skip the file dir/data1/a.txt. | |
319 | |
320 You can use regex alternation to specify multiple exclusions, | |
321 for example: | |
322 | |
323 gsutil rsync -x '.*\\.txt|.*\\.jpg' dir gs://my-bucket | |
324 """) | |
325 | |
326 | |
327 class _DiffAction(object): | |
328 COPY = 'copy' | |
329 REMOVE = 'remove' | |
330 | |
331 | |
332 _NA = '-' | |
333 _OUTPUT_BUFFER_SIZE = 64 * 1024 | |
334 _PROGRESS_REPORT_LISTING_COUNT = 10000 | |
335 | |
336 | |
337 # Tracks files we need to clean up at end or if interrupted. | |
338 _tmp_files = [] | |
339 | |
340 | |
341 # pylint: disable=unused-argument | |
342 def _HandleSignals(signal_num, cur_stack_frame): | |
343 """Called when rsync command is killed with SIGINT, SIGQUIT or SIGTERM.""" | |
344 CleanUpTempFiles() | |
345 | |
346 | |
347 def CleanUpTempFiles(): | |
348 """Cleans up temp files. | |
349 | |
350 This function allows the main (RunCommand) function to clean up at end of | |
351 operation, or if gsutil rsync is interrupted (e.g., via ^C). This is necessary | |
352 because tempfile.NamedTemporaryFile doesn't allow the created file to be | |
353 re-opened in read mode on Windows, so we have to use tempfile.mkstemp, which | |
354 doesn't automatically delete temp files. | |
355 """ | |
356 try: | |
357 for fname in _tmp_files: | |
358 os.unlink(fname) | |
359 except: # pylint: disable=bare-except | |
360 pass | |
361 | |
362 | |
363 class _DiffToApply(object): | |
364 """Class that encapsulates info needed to apply diff for one object.""" | |
365 | |
366 def __init__(self, src_url_str, dst_url_str, diff_action): | |
367 """Constructor. | |
368 | |
369 Args: | |
370 src_url_str: The source URL string, or None if diff_action is REMOVE. | |
371 dst_url_str: The destination URL string. | |
372 diff_action: _DiffAction to be applied. | |
373 """ | |
374 self.src_url_str = src_url_str | |
375 self.dst_url_str = dst_url_str | |
376 self.diff_action = diff_action | |
377 | |
378 | |
379 def _DiffToApplyArgChecker(command_instance, diff_to_apply): | |
380 """Arg checker that skips symlinks if -e flag specified.""" | |
381 if (diff_to_apply.diff_action == _DiffAction.REMOVE | |
382 or not command_instance.exclude_symlinks): | |
383 # No src URL is populated for REMOVE actions. | |
384 return True | |
385 exp_src_url = StorageUrlFromString(diff_to_apply.src_url_str) | |
386 if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name): | |
387 command_instance.logger.info('Skipping symbolic link %s...', exp_src_url) | |
388 return False | |
389 return True | |
390 | |
391 | |
392 def _ComputeNeededFileChecksums(logger, src_url_str, src_size, src_crc32c, | |
393 src_md5, dst_url_str, dst_size, dst_crc32c, | |
394 dst_md5): | |
395 """Computes any file checksums needed by _ObjectsMatch. | |
396 | |
397 Args: | |
398 logger: logging.logger for outputting log messages. | |
399 src_url_str: Source URL string. | |
400 src_size: Source size | |
401 src_crc32c: Source CRC32c. | |
402 src_md5: Source MD5. | |
403 dst_url_str: Destination URL string. | |
404 dst_size: Destination size | |
405 dst_crc32c: Destination CRC32c. | |
406 dst_md5: Destination MD5. | |
407 | |
408 Returns: | |
409 (src_crc32c, src_md5, dst_crc32c, dst_md5) | |
410 """ | |
411 src_url = StorageUrlFromString(src_url_str) | |
412 dst_url = StorageUrlFromString(dst_url_str) | |
413 if src_url.IsFileUrl(): | |
414 if dst_crc32c != _NA or dst_url.IsFileUrl(): | |
415 if src_size > TEN_MIB: | |
416 logger.info('Computing MD5 for %s...', src_url_str) | |
417 with open(src_url.object_name, 'rb') as fp: | |
418 src_crc32c = CalculateB64EncodedCrc32cFromContents(fp) | |
419 elif dst_md5 != _NA or dst_url.IsFileUrl(): | |
420 if dst_size > TEN_MIB: | |
421 logger.info('Computing MD5 for %s...', dst_url_str) | |
422 with open(src_url.object_name, 'rb') as fp: | |
423 src_md5 = CalculateB64EncodedMd5FromContents(fp) | |
424 if dst_url.IsFileUrl(): | |
425 if src_crc32c != _NA: | |
426 if src_size > TEN_MIB: | |
427 logger.info('Computing CRC32C for %s...', src_url_str) | |
428 with open(dst_url.object_name, 'rb') as fp: | |
429 dst_crc32c = CalculateB64EncodedCrc32cFromContents(fp) | |
430 elif src_md5 != _NA: | |
431 if dst_size > TEN_MIB: | |
432 logger.info('Computing CRC32C for %s...', dst_url_str) | |
433 with open(dst_url.object_name, 'rb') as fp: | |
434 dst_md5 = CalculateB64EncodedMd5FromContents(fp) | |
435 return (src_crc32c, src_md5, dst_crc32c, dst_md5) | |
436 | |
437 | |
438 def _ListUrlRootFunc(cls, args_tuple, thread_state=None): | |
439 """Worker function for listing files/objects under to be sync'd. | |
440 | |
441 Outputs sorted list to out_file_name, formatted per _BuildTmpOutputLine. We | |
442 sort the listed URLs because we don't want to depend on consistent sort | |
443 order across file systems and cloud providers. | |
444 | |
445 Args: | |
446 cls: Command instance. | |
447 args_tuple: (base_url_str, out_file_name, desc), where base_url_str is | |
448 top-level URL string to list; out_filename is name of file to | |
449 which sorted output should be written; desc is 'source' or | |
450 'destination'. | |
451 thread_state: gsutil Cloud API instance to use. | |
452 """ | |
453 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) | |
454 (base_url_str, out_filename, desc) = args_tuple | |
455 # We sort while iterating over base_url_str, allowing parallelism of batched | |
456 # sorting with collecting the listing. | |
457 out_file = io.open(out_filename, mode='w', encoding=UTF8) | |
458 try: | |
459 _BatchSort(_FieldedListingIterator(cls, gsutil_api, base_url_str, desc), | |
460 out_file) | |
461 except Exception as e: # pylint: disable=broad-except | |
462 # Abandon rsync if an exception percolates up to this layer - retryable | |
463 # exceptions are handled in the lower layers, so we got a non-retryable | |
464 # exception (like 404 bucket not found) and proceeding would either be | |
465 # futile or could result in data loss - for example: | |
466 # gsutil rsync -d gs://non-existent-bucket ./localdir | |
467 # would delete files from localdir. | |
468 cls.logger.error( | |
469 'Caught non-retryable exception while listing %s: %s' % | |
470 (base_url_str, e)) | |
471 cls.non_retryable_listing_failures = 1 | |
472 out_file.close() | |
473 | |
474 | |
475 def _FieldedListingIterator(cls, gsutil_api, base_url_str, desc): | |
476 """Iterator over base_url_str formatting output per _BuildTmpOutputLine. | |
477 | |
478 Args: | |
479 cls: Command instance. | |
480 gsutil_api: gsutil Cloud API instance to use for bucket listing. | |
481 base_url_str: The top-level URL string over which to iterate. | |
482 desc: 'source' or 'destination'. | |
483 | |
484 Yields: | |
485 Output line formatted per _BuildTmpOutputLine. | |
486 """ | |
487 if cls.recursion_requested: | |
488 wildcard = '%s/**' % base_url_str.rstrip('/\\') | |
489 else: | |
490 wildcard = '%s/*' % base_url_str.rstrip('/\\') | |
491 i = 0 | |
492 for blr in CreateWildcardIterator( | |
493 wildcard, gsutil_api, debug=cls.debug, | |
494 project_id=cls.project_id).IterObjects( | |
495 # Request just the needed fields, to reduce bandwidth usage. | |
496 bucket_listing_fields=['crc32c', 'md5Hash', 'name', 'size']): | |
497 # Various GUI tools (like the GCS web console) create placeholder objects | |
498 # ending with '/' when the user creates an empty directory. Normally these | |
499 # tools should delete those placeholders once objects have been written | |
500 # "under" the directory, but sometimes the placeholders are left around. | |
501 # We need to filter them out here, otherwise if the user tries to rsync | |
502 # from GCS to a local directory it will result in a directory/file | |
503 # conflict (e.g., trying to download an object called "mydata/" where the | |
504 # local directory "mydata" exists). | |
505 url = blr.storage_url | |
506 if IsCloudSubdirPlaceholder(url, blr=blr): | |
507 cls.logger.info('Skipping cloud sub-directory placeholder object (%s) ' | |
508 'because such objects aren\'t needed in (and would ' | |
509 'interfere with) directories in the local file system', | |
510 url) | |
511 continue | |
512 if (cls.exclude_symlinks and url.IsFileUrl() | |
513 and os.path.islink(url.object_name)): | |
514 continue | |
515 if cls.exclude_pattern: | |
516 str_to_check = url.url_string[len(base_url_str):] | |
517 if str_to_check.startswith(url.delim): | |
518 str_to_check = str_to_check[1:] | |
519 if cls.exclude_pattern.match(str_to_check): | |
520 continue | |
521 i += 1 | |
522 if i % _PROGRESS_REPORT_LISTING_COUNT == 0: | |
523 cls.logger.info('At %s listing %d...', desc, i) | |
524 yield _BuildTmpOutputLine(blr) | |
525 | |
526 | |
527 def _BuildTmpOutputLine(blr): | |
528 """Builds line to output to temp file for given BucketListingRef. | |
529 | |
530 Args: | |
531 blr: The BucketListingRef. | |
532 | |
533 Returns: | |
534 The output line, formatted as _EncodeUrl(URL)<sp>size<sp>crc32c<sp>md5 | |
535 where crc32c will only be present for GCS URLs, and md5 will only be | |
536 present for cloud URLs that aren't composite objects. A missing field is | |
537 populated with '-'. | |
538 """ | |
539 crc32c = _NA | |
540 md5 = _NA | |
541 url = blr.storage_url | |
542 if url.IsFileUrl(): | |
543 size = os.path.getsize(url.object_name) | |
544 elif url.IsCloudUrl(): | |
545 size = blr.root_object.size | |
546 crc32c = blr.root_object.crc32c or _NA | |
547 md5 = blr.root_object.md5Hash or _NA | |
548 else: | |
549 raise CommandException('Got unexpected URL type (%s)' % url.scheme) | |
550 return '%s %d %s %s\n' % (_EncodeUrl(url.url_string), size, crc32c, md5) | |
551 | |
552 | |
553 def _EncodeUrl(url_string): | |
554 """Encodes url_str with quote plus encoding and UTF8 character encoding. | |
555 | |
556 We use this for all URL encodings. | |
557 | |
558 Args: | |
559 url_string: String URL to encode. | |
560 | |
561 Returns: | |
562 encoded URL. | |
563 """ | |
564 return urllib.quote_plus(url_string.encode(UTF8)) | |
565 | |
566 | |
567 def _DecodeUrl(enc_url_string): | |
568 """Inverts encoding from EncodeUrl. | |
569 | |
570 Args: | |
571 enc_url_string: String URL to decode. | |
572 | |
573 Returns: | |
574 decoded URL. | |
575 """ | |
576 return urllib.unquote_plus(enc_url_string).decode(UTF8) | |
577 | |
578 | |
579 # pylint: disable=bare-except | |
580 def _BatchSort(in_iter, out_file): | |
581 """Sorts input lines from in_iter and outputs to out_file. | |
582 | |
583 Sorts in batches as input arrives, so input file does not need to be loaded | |
584 into memory all at once. Derived from Python Recipe 466302: Sorting big | |
585 files the Python 2.4 way by Nicolas Lehuen. | |
586 | |
587 Sorted format is per _BuildTmpOutputLine. We're sorting on the entire line | |
588 when we could just sort on the first record (URL); but the sort order is | |
589 identical either way. | |
590 | |
591 Args: | |
592 in_iter: Input iterator. | |
593 out_file: Output file. | |
594 """ | |
595 # Note: If chunk_files gets very large we can run out of open FDs. See .boto | |
596 # file comments about rsync_buffer_lines. If increasing rsync_buffer_lines | |
597 # doesn't suffice (e.g., for someone synchronizing with a really large | |
598 # bucket), an option would be to make gsutil merge in passes, never | |
599 # opening all chunk files simultaneously. | |
600 buffer_size = config.getint('GSUtil', 'rsync_buffer_lines', 32000) | |
601 chunk_files = [] | |
602 try: | |
603 while True: | |
604 current_chunk = sorted(islice(in_iter, buffer_size)) | |
605 if not current_chunk: | |
606 break | |
607 output_chunk = io.open('%s-%06i' % (out_file.name, len(chunk_files)), | |
608 mode='w+', encoding=UTF8) | |
609 chunk_files.append(output_chunk) | |
610 output_chunk.writelines(unicode(''.join(current_chunk))) | |
611 output_chunk.flush() | |
612 output_chunk.seek(0) | |
613 out_file.writelines(heapq.merge(*chunk_files)) | |
614 except IOError as e: | |
615 if e.errno == errno.EMFILE: | |
616 raise CommandException('\n'.join(textwrap.wrap( | |
617 'Synchronization failed because too many open file handles were ' | |
618 'needed while building synchronization state. Please see the ' | |
619 'comments about rsync_buffer_lines in your .boto config file for a ' | |
620 'possible way to address this problem.'))) | |
621 raise | |
622 finally: | |
623 for chunk_file in chunk_files: | |
624 try: | |
625 chunk_file.close() | |
626 os.remove(chunk_file.name) | |
627 except: | |
628 pass | |
629 | |
630 | |
631 class _DiffIterator(object): | |
632 """Iterator yielding sequence of _DiffToApply objects.""" | |
633 | |
634 def __init__(self, command_obj, base_src_url, base_dst_url): | |
635 self.command_obj = command_obj | |
636 self.compute_file_checksums = command_obj.compute_file_checksums | |
637 self.delete_extras = command_obj.delete_extras | |
638 self.recursion_requested = command_obj.recursion_requested | |
639 self.logger = self.command_obj.logger | |
640 self.base_src_url = base_src_url | |
641 self.base_dst_url = base_dst_url | |
642 self.logger.info('Building synchronization state...') | |
643 | |
644 (src_fh, self.sorted_list_src_file_name) = tempfile.mkstemp( | |
645 prefix='gsutil-rsync-src-') | |
646 _tmp_files.append(self.sorted_list_src_file_name) | |
647 (dst_fh, self.sorted_list_dst_file_name) = tempfile.mkstemp( | |
648 prefix='gsutil-rsync-dst-') | |
649 _tmp_files.append(self.sorted_list_dst_file_name) | |
650 # Close the file handles; the file will be opened in write mode by | |
651 # _ListUrlRootFunc. | |
652 os.close(src_fh) | |
653 os.close(dst_fh) | |
654 | |
655 # Build sorted lists of src and dst URLs in parallel. To do this, pass args | |
656 # to _ListUrlRootFunc as tuple (base_url_str, out_filename, desc) | |
657 # where base_url_str is the starting URL string for listing. | |
658 args_iter = iter([ | |
659 (self.base_src_url.url_string, self.sorted_list_src_file_name, | |
660 'source'), | |
661 (self.base_dst_url.url_string, self.sorted_list_dst_file_name, | |
662 'destination') | |
663 ]) | |
664 | |
665 # Contains error message from non-retryable listing failure. | |
666 command_obj.non_retryable_listing_failures = 0 | |
667 shared_attrs = ['non_retryable_listing_failures'] | |
668 command_obj.Apply(_ListUrlRootFunc, args_iter, _RootListingExceptionHandler, | |
669 shared_attrs, arg_checker=DummyArgChecker, | |
670 parallel_operations_override=True, | |
671 fail_on_error=True) | |
672 | |
673 if command_obj.non_retryable_listing_failures: | |
674 raise CommandException('Caught non-retryable exception - aborting rsync') | |
675 | |
676 self.sorted_list_src_file = open(self.sorted_list_src_file_name, 'r') | |
677 self.sorted_list_dst_file = open(self.sorted_list_dst_file_name, 'r') | |
678 | |
679 # Wrap iterators in PluralityCheckableIterator so we can check emptiness. | |
680 self.sorted_src_urls_it = PluralityCheckableIterator( | |
681 iter(self.sorted_list_src_file)) | |
682 self.sorted_dst_urls_it = PluralityCheckableIterator( | |
683 iter(self.sorted_list_dst_file)) | |
684 | |
685 def _ParseTmpFileLine(self, line): | |
686 """Parses output from _BuildTmpOutputLine. | |
687 | |
688 Parses into tuple: | |
689 (URL, size, crc32c, md5) | |
690 where crc32c and/or md5 can be _NA. | |
691 | |
692 Args: | |
693 line: The line to parse. | |
694 | |
695 Returns: | |
696 Parsed tuple: (url, size, crc32c, md5) | |
697 """ | |
698 (encoded_url, size, crc32c, md5) = line.split() | |
699 return (_DecodeUrl(encoded_url), int(size), crc32c, md5.strip()) | |
700 | |
701 def _WarnIfMissingCloudHash(self, url_str, crc32c, md5): | |
702 """Warns if given url_str is a cloud URL and is missing both crc32c and md5. | |
703 | |
704 Args: | |
705 url_str: Destination URL string. | |
706 crc32c: Destination CRC32c. | |
707 md5: Destination MD5. | |
708 | |
709 Returns: | |
710 True if issued warning. | |
711 """ | |
712 # One known way this can currently happen is when rsync'ing objects larger | |
713 # than 5 GB from S3 (for which the etag is not an MD5). | |
714 if (StorageUrlFromString(url_str).IsCloudUrl() | |
715 and crc32c == _NA and md5 == _NA): | |
716 self.logger.warn( | |
717 'Found no hashes to validate %s. Integrity cannot be assured without ' | |
718 'hashes.', url_str) | |
719 return True | |
720 return False | |
721 | |
722 def _ObjectsMatch(self, src_url_str, src_size, src_crc32c, src_md5, | |
723 dst_url_str, dst_size, dst_crc32c, dst_md5): | |
724 """Returns True if src and dst objects are the same. | |
725 | |
726 Uses size plus whatever checksums are available. | |
727 | |
728 Args: | |
729 src_url_str: Source URL string. | |
730 src_size: Source size | |
731 src_crc32c: Source CRC32c. | |
732 src_md5: Source MD5. | |
733 dst_url_str: Destination URL string. | |
734 dst_size: Destination size | |
735 dst_crc32c: Destination CRC32c. | |
736 dst_md5: Destination MD5. | |
737 | |
738 Returns: | |
739 True/False. | |
740 """ | |
741 # Note: This function is called from __iter__, which is called from the | |
742 # Command.Apply driver. Thus, all checksum computation will be run in a | |
743 # single thread, which is good (having multiple threads concurrently | |
744 # computing checksums would thrash the disk). | |
745 if src_size != dst_size: | |
746 return False | |
747 if self.compute_file_checksums: | |
748 (src_crc32c, src_md5, dst_crc32c, dst_md5) = _ComputeNeededFileChecksums( | |
749 self.logger, src_url_str, src_size, src_crc32c, src_md5, dst_url_str, | |
750 dst_size, dst_crc32c, dst_md5) | |
751 if src_md5 != _NA and dst_md5 != _NA: | |
752 self.logger.debug('Comparing md5 for %s and %s', src_url_str, dst_url_str) | |
753 return src_md5 == dst_md5 | |
754 if src_crc32c != _NA and dst_crc32c != _NA: | |
755 self.logger.debug( | |
756 'Comparing crc32c for %s and %s', src_url_str, dst_url_str) | |
757 return src_crc32c == dst_crc32c | |
758 if not self._WarnIfMissingCloudHash(src_url_str, src_crc32c, src_md5): | |
759 self._WarnIfMissingCloudHash(dst_url_str, dst_crc32c, dst_md5) | |
760 # Without checksums to compare we depend only on basic size comparison. | |
761 return True | |
762 | |
763 def __iter__(self): | |
764 """Iterates over src/dst URLs and produces a _DiffToApply sequence. | |
765 | |
766 Yields: | |
767 The _DiffToApply. | |
768 """ | |
769 # Strip trailing slashes, if any, so we compute tail length against | |
770 # consistent position regardless of whether trailing slashes were included | |
771 # or not in URL. | |
772 base_src_url_len = len(self.base_src_url.url_string.rstrip('/\\')) | |
773 base_dst_url_len = len(self.base_dst_url.url_string.rstrip('/\\')) | |
774 src_url_str = dst_url_str = None | |
775 # Invariant: After each yield, the URLs in src_url_str, dst_url_str, | |
776 # self.sorted_src_urls_it, and self.sorted_dst_urls_it are not yet | |
777 # processed. Each time we encounter None in src_url_str or dst_url_str we | |
778 # populate from the respective iterator, and we reset one or the other value | |
779 # to None after yielding an action that disposes of that URL. | |
780 while not self.sorted_src_urls_it.IsEmpty() or src_url_str is not None: | |
781 if src_url_str is None: | |
782 (src_url_str, src_size, src_crc32c, src_md5) = self._ParseTmpFileLine( | |
783 self.sorted_src_urls_it.next()) | |
784 # Skip past base URL and normalize slashes so we can compare across | |
785 # clouds/file systems (including Windows). | |
786 src_url_str_to_check = _EncodeUrl( | |
787 src_url_str[base_src_url_len:].replace('\\', '/')) | |
788 dst_url_str_would_copy_to = copy_helper.ConstructDstUrl( | |
789 self.base_src_url, StorageUrlFromString(src_url_str), True, True, | |
790 self.base_dst_url, False, self.recursion_requested).url_string | |
791 if self.sorted_dst_urls_it.IsEmpty(): | |
792 # We've reached end of dst URLs, so copy src to dst. | |
793 yield _DiffToApply( | |
794 src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY) | |
795 src_url_str = None | |
796 continue | |
797 if not dst_url_str: | |
798 (dst_url_str, dst_size, dst_crc32c, dst_md5) = ( | |
799 self._ParseTmpFileLine(self.sorted_dst_urls_it.next())) | |
800 # Skip past base URL and normalize slashes so we can compare acros | |
801 # clouds/file systems (including Windows). | |
802 dst_url_str_to_check = _EncodeUrl( | |
803 dst_url_str[base_dst_url_len:].replace('\\', '/')) | |
804 | |
805 if src_url_str_to_check < dst_url_str_to_check: | |
806 # There's no dst object corresponding to src object, so copy src to dst. | |
807 yield _DiffToApply( | |
808 src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY) | |
809 src_url_str = None | |
810 elif src_url_str_to_check > dst_url_str_to_check: | |
811 # dst object without a corresponding src object, so remove dst if -d | |
812 # option was specified. | |
813 if self.delete_extras: | |
814 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE) | |
815 dst_url_str = None | |
816 else: | |
817 # There is a dst object corresponding to src object, so check if objects | |
818 # match. | |
819 if self._ObjectsMatch( | |
820 src_url_str, src_size, src_crc32c, src_md5, | |
821 dst_url_str, dst_size, dst_crc32c, dst_md5): | |
822 # Continue iterating without yielding a _DiffToApply. | |
823 pass | |
824 else: | |
825 yield _DiffToApply(src_url_str, dst_url_str, _DiffAction.COPY) | |
826 src_url_str = None | |
827 dst_url_str = None | |
828 | |
829 # If -d option specified any files/objects left in dst iteration should be | |
830 # removed. | |
831 if not self.delete_extras: | |
832 return | |
833 if dst_url_str: | |
834 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE) | |
835 dst_url_str = None | |
836 for line in self.sorted_dst_urls_it: | |
837 (dst_url_str, _, _, _) = self._ParseTmpFileLine(line) | |
838 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE) | |
839 | |
840 | |
841 def _RsyncFunc(cls, diff_to_apply, thread_state=None): | |
842 """Worker function for performing the actual copy and remove operations.""" | |
843 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) | |
844 dst_url_str = diff_to_apply.dst_url_str | |
845 dst_url = StorageUrlFromString(dst_url_str) | |
846 if diff_to_apply.diff_action == _DiffAction.REMOVE: | |
847 if cls.dryrun: | |
848 cls.logger.info('Would remove %s', dst_url) | |
849 else: | |
850 cls.logger.info('Removing %s', dst_url) | |
851 if dst_url.IsFileUrl(): | |
852 os.unlink(dst_url.object_name) | |
853 else: | |
854 try: | |
855 gsutil_api.DeleteObject( | |
856 dst_url.bucket_name, dst_url.object_name, | |
857 generation=dst_url.generation, provider=dst_url.scheme) | |
858 except NotFoundException: | |
859 # If the object happened to be deleted by an external process, this | |
860 # is fine because it moves us closer to the desired state. | |
861 pass | |
862 elif diff_to_apply.diff_action == _DiffAction.COPY: | |
863 src_url_str = diff_to_apply.src_url_str | |
864 src_url = StorageUrlFromString(src_url_str) | |
865 if cls.dryrun: | |
866 cls.logger.info('Would copy %s to %s', src_url, dst_url) | |
867 else: | |
868 try: | |
869 copy_helper.PerformCopy(cls.logger, src_url, dst_url, gsutil_api, cls, | |
870 _RsyncExceptionHandler, | |
871 headers=cls.headers) | |
872 except SkipUnsupportedObjectError, e: | |
873 cls.logger.info('Skipping item %s with unsupported object type %s', | |
874 src_url, e.unsupported_type) | |
875 | |
876 else: | |
877 raise CommandException('Got unexpected DiffAction (%d)' | |
878 % diff_to_apply.diff_action) | |
879 | |
880 | |
881 def _RootListingExceptionHandler(cls, e): | |
882 """Simple exception handler for exceptions during listing URLs to sync.""" | |
883 cls.logger.error(str(e)) | |
884 | |
885 | |
886 def _RsyncExceptionHandler(cls, e): | |
887 """Simple exception handler to allow post-completion status.""" | |
888 cls.logger.error(str(e)) | |
889 cls.op_failure_count += 1 | |
890 cls.logger.debug('\n\nEncountered exception while syncing:\n%s\n', | |
891 traceback.format_exc()) | |
892 | |
893 | |
894 class RsyncCommand(Command): | |
895 """Implementation of gsutil rsync command.""" | |
896 | |
897 # Command specification. See base class for documentation. | |
898 command_spec = Command.CreateCommandSpec( | |
899 'rsync', | |
900 command_name_aliases=[], | |
901 usage_synopsis=_SYNOPSIS, | |
902 min_args=2, | |
903 max_args=2, | |
904 supported_sub_args='cCdenprRUx:', | |
905 file_url_ok=True, | |
906 provider_url_ok=False, | |
907 urls_start_arg=0, | |
908 gs_api_support=[ApiSelector.XML, ApiSelector.JSON], | |
909 gs_default_api=ApiSelector.JSON, | |
910 argparse_arguments=[ | |
911 CommandArgument.MakeNCloudOrFileURLsArgument(2) | |
912 ] | |
913 ) | |
914 # Help specification. See help_provider.py for documentation. | |
915 help_spec = Command.HelpSpec( | |
916 help_name='rsync', | |
917 help_name_aliases=['sync', 'synchronize'], | |
918 help_type='command_help', | |
919 help_one_line_summary='Synchronize content of two buckets/directories', | |
920 help_text=_DETAILED_HELP_TEXT, | |
921 subcommand_help_text={}, | |
922 ) | |
923 total_bytes_transferred = 0 | |
924 | |
925 def _InsistContainer(self, url_str, treat_nonexistent_object_as_subdir): | |
926 """Sanity checks that URL names an existing container. | |
927 | |
928 Args: | |
929 url_str: URL string to check. | |
930 treat_nonexistent_object_as_subdir: indicates if should treat a | |
931 non-existent object as a subdir. | |
932 | |
933 Returns: | |
934 URL for checked string. | |
935 | |
936 Raises: | |
937 CommandException if url_str doesn't name an existing container. | |
938 """ | |
939 (url, have_existing_container) = ( | |
940 copy_helper.ExpandUrlToSingleBlr(url_str, self.gsutil_api, self.debug, | |
941 self.project_id, | |
942 treat_nonexistent_object_as_subdir)) | |
943 if not have_existing_container: | |
944 raise CommandException( | |
945 'arg (%s) does not name a directory, bucket, or bucket subdir.' | |
946 % url_str) | |
947 return url | |
948 | |
949 def RunCommand(self): | |
950 """Command entry point for the rsync command.""" | |
951 self._ParseOpts() | |
952 if self.compute_file_checksums and not UsingCrcmodExtension(crcmod): | |
953 self.logger.warn(SLOW_CRCMOD_WARNING) | |
954 | |
955 src_url = self._InsistContainer(self.args[0], False) | |
956 dst_url = self._InsistContainer(self.args[1], True) | |
957 | |
958 # Tracks if any copy or rm operations failed. | |
959 self.op_failure_count = 0 | |
960 | |
961 # List of attributes to share/manage across multiple processes in | |
962 # parallel (-m) mode. | |
963 shared_attrs = ['op_failure_count'] | |
964 | |
965 for signal_num in GetCaughtSignals(): | |
966 RegisterSignalHandler(signal_num, _HandleSignals) | |
967 | |
968 # Perform sync requests in parallel (-m) mode, if requested, using | |
969 # configured number of parallel processes and threads. Otherwise, | |
970 # perform requests with sequential function calls in current process. | |
971 diff_iterator = _DiffIterator(self, src_url, dst_url) | |
972 self.logger.info('Starting synchronization') | |
973 try: | |
974 self.Apply(_RsyncFunc, diff_iterator, _RsyncExceptionHandler, | |
975 shared_attrs, arg_checker=_DiffToApplyArgChecker, | |
976 fail_on_error=True) | |
977 finally: | |
978 CleanUpTempFiles() | |
979 | |
980 if self.op_failure_count: | |
981 plural_str = 's' if self.op_failure_count else '' | |
982 raise CommandException( | |
983 '%d file%s/object%s could not be copied/removed.' % | |
984 (self.op_failure_count, plural_str, plural_str)) | |
985 | |
986 def _ParseOpts(self): | |
987 # exclude_symlinks is handled by Command parent class, so save in Command | |
988 # state rather than CopyHelperOpts. | |
989 self.exclude_symlinks = False | |
990 # continue_on_error is handled by Command parent class, so save in Command | |
991 # state rather than CopyHelperOpts. | |
992 self.continue_on_error = False | |
993 self.delete_extras = False | |
994 preserve_acl = False | |
995 self.compute_file_checksums = False | |
996 self.dryrun = False | |
997 self.exclude_pattern = None | |
998 self.skip_unsupported_objects = False | |
999 # self.recursion_requested is initialized in command.py (so it can be | |
1000 # checked in parent class for all commands). | |
1001 | |
1002 if self.sub_opts: | |
1003 for o, a in self.sub_opts: | |
1004 if o == '-c': | |
1005 self.compute_file_checksums = True | |
1006 # Note: In gsutil cp command this is specified using -c but here we use | |
1007 # -C so we can use -c for checksum arg (to be consistent with Unix rsync | |
1008 # command options). | |
1009 elif o == '-C': | |
1010 self.continue_on_error = True | |
1011 elif o == '-d': | |
1012 self.delete_extras = True | |
1013 elif o == '-e': | |
1014 self.exclude_symlinks = True | |
1015 elif o == '-n': | |
1016 self.dryrun = True | |
1017 elif o == '-p': | |
1018 preserve_acl = True | |
1019 elif o == '-r' or o == '-R': | |
1020 self.recursion_requested = True | |
1021 elif o == '-U': | |
1022 self.skip_unsupported_objects = True | |
1023 elif o == '-x': | |
1024 if not a: | |
1025 raise CommandException('Invalid blank exclude filter') | |
1026 try: | |
1027 self.exclude_pattern = re.compile(a) | |
1028 except re.error: | |
1029 raise CommandException('Invalid exclude filter (%s)' % a) | |
1030 return CreateCopyHelperOpts( | |
1031 preserve_acl=preserve_acl, | |
1032 skip_unsupported_objects=self.skip_unsupported_objects) | |
OLD | NEW |