OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2011 Google Inc. All Rights Reserved. | |
3 # Copyright 2011, Nexenta Systems Inc. | |
4 # | |
5 # Licensed under the Apache License, Version 2.0 (the "License"); | |
6 # you may not use this file except in compliance with the License. | |
7 # You may obtain a copy of the License at | |
8 # | |
9 # http://www.apache.org/licenses/LICENSE-2.0 | |
10 # | |
11 # Unless required by applicable law or agreed to in writing, software | |
12 # distributed under the License is distributed on an "AS IS" BASIS, | |
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 # See the License for the specific language governing permissions and | |
15 # limitations under the License. | |
16 """Helper functions for copy functionality.""" | |
17 | |
18 from __future__ import absolute_import | |
19 | |
20 import base64 | |
21 from collections import namedtuple | |
22 import csv | |
23 import datetime | |
24 import errno | |
25 import gzip | |
26 from hashlib import md5 | |
27 import json | |
28 import logging | |
29 import mimetypes | |
30 import multiprocessing | |
31 import os | |
32 import pickle | |
33 import random | |
34 import re | |
35 import shutil | |
36 import stat | |
37 import subprocess | |
38 import tempfile | |
39 import textwrap | |
40 import time | |
41 import traceback | |
42 | |
43 from boto import config | |
44 import crcmod | |
45 | |
46 import gslib | |
47 from gslib.cloud_api import ArgumentException | |
48 from gslib.cloud_api import CloudApi | |
49 from gslib.cloud_api import NotFoundException | |
50 from gslib.cloud_api import PreconditionException | |
51 from gslib.cloud_api import Preconditions | |
52 from gslib.cloud_api import ResumableDownloadException | |
53 from gslib.cloud_api import ResumableUploadAbortException | |
54 from gslib.cloud_api import ResumableUploadException | |
55 from gslib.cloud_api import ResumableUploadStartOverException | |
56 from gslib.cloud_api_helper import GetDownloadSerializationDict | |
57 from gslib.commands.compose import MAX_COMPOSE_ARITY | |
58 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SI
ZE | |
59 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD | |
60 from gslib.cs_api_map import ApiSelector | |
61 from gslib.daisy_chain_wrapper import DaisyChainWrapper | |
62 from gslib.exception import CommandException | |
63 from gslib.exception import HashMismatchException | |
64 from gslib.file_part import FilePart | |
65 from gslib.hashing_helper import Base64EncodeHash | |
66 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents | |
67 from gslib.hashing_helper import CalculateHashesFromContents | |
68 from gslib.hashing_helper import GetDownloadHashAlgs | |
69 from gslib.hashing_helper import GetUploadHashAlgs | |
70 from gslib.hashing_helper import HashingFileUploadWrapper | |
71 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict | |
72 from gslib.parallelism_framework_util import ThreadSafeDict | |
73 from gslib.progress_callback import ConstructAnnounceText | |
74 from gslib.progress_callback import FileProgressCallbackHandler | |
75 from gslib.progress_callback import ProgressCallbackWithBackoff | |
76 from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper | |
77 from gslib.storage_url import ContainsWildcard | |
78 from gslib.storage_url import StorageUrlFromString | |
79 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages | |
80 from gslib.tracker_file import DeleteTrackerFile | |
81 from gslib.tracker_file import GetTrackerFilePath | |
82 from gslib.tracker_file import RaiseUnwritableTrackerFileException | |
83 from gslib.tracker_file import ReadOrCreateDownloadTrackerFile | |
84 from gslib.tracker_file import TrackerFileType | |
85 from gslib.translation_helper import AddS3MarkerAclToObjectMetadata | |
86 from gslib.translation_helper import CopyObjectMetadata | |
87 from gslib.translation_helper import DEFAULT_CONTENT_TYPE | |
88 from gslib.translation_helper import GenerationFromUrlAndString | |
89 from gslib.translation_helper import ObjectMetadataFromHeaders | |
90 from gslib.translation_helper import PreconditionsFromHeaders | |
91 from gslib.translation_helper import S3MarkerAclFromObjectMetadata | |
92 from gslib.util import CreateLock | |
93 from gslib.util import DEFAULT_FILE_BUFFER_SIZE | |
94 from gslib.util import GetCloudApiInstance | |
95 from gslib.util import GetFileSize | |
96 from gslib.util import GetJsonResumableChunkSize | |
97 from gslib.util import GetMaxRetryDelay | |
98 from gslib.util import GetNumRetries | |
99 from gslib.util import GetStreamFromFileUrl | |
100 from gslib.util import HumanReadableToBytes | |
101 from gslib.util import IS_WINDOWS | |
102 from gslib.util import IsCloudSubdirPlaceholder | |
103 from gslib.util import MakeHumanReadable | |
104 from gslib.util import MIN_SIZE_COMPUTE_LOGGING | |
105 from gslib.util import MultiprocessingIsAvailable | |
106 from gslib.util import ResumableThreshold | |
107 from gslib.util import TEN_MIB | |
108 from gslib.util import UTF8 | |
109 from gslib.wildcard_iterator import CreateWildcardIterator | |
110 | |
111 # pylint: disable=g-import-not-at-top | |
112 if IS_WINDOWS: | |
113 import msvcrt | |
114 from ctypes import c_int | |
115 from ctypes import c_uint64 | |
116 from ctypes import c_char_p | |
117 from ctypes import c_wchar_p | |
118 from ctypes import windll | |
119 from ctypes import POINTER | |
120 from ctypes import WINFUNCTYPE | |
121 from ctypes import WinError | |
122 | |
123 # Declare copy_helper_opts as a global because namedtuple isn't aware of | |
124 # assigning to a class member (which breaks pickling done by multiprocessing). | |
125 # For details see | |
126 # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instanc
e-correctly | |
127 # Similarly can't pickle logger. | |
128 # pylint: disable=global-at-module-level | |
129 global global_copy_helper_opts, global_logger | |
130 | |
131 # In-memory map of local files that are currently opened for write. Used to | |
132 # ensure that if we write to the same file twice (say, for example, because the | |
133 # user specified two identical source URLs), the writes occur serially. | |
134 global open_files_map | |
135 open_files_map = ( | |
136 ThreadSafeDict() if (IS_WINDOWS or not MultiprocessingIsAvailable()[0]) | |
137 else ThreadAndProcessSafeDict(multiprocessing.Manager())) | |
138 | |
139 # For debugging purposes; if True, files and objects that fail hash validation | |
140 # will be saved with the below suffix appended. | |
141 _RENAME_ON_HASH_MISMATCH = False | |
142 _RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt' | |
143 | |
144 PARALLEL_UPLOAD_TEMP_NAMESPACE = ( | |
145 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/') | |
146 | |
147 PARALLEL_UPLOAD_STATIC_SALT = u""" | |
148 PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS. | |
149 The theory is that no user will have prepended this to the front of | |
150 one of their object names and then done an MD5 hash of the name, and | |
151 then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object | |
152 name. Note that there will be no problems with object name length since we | |
153 hash the original name. | |
154 """ | |
155 | |
156 # When uploading a file, get the following fields in the response for | |
157 # filling in command output and manifests. | |
158 UPLOAD_RETURN_FIELDS = ['crc32c', 'etag', 'generation', 'md5Hash', 'size'] | |
159 | |
160 # This tuple is used only to encapsulate the arguments needed for | |
161 # command.Apply() in the parallel composite upload case. | |
162 # Note that content_type is used instead of a full apitools Object() because | |
163 # apitools objects are not picklable. | |
164 # filename: String name of file. | |
165 # file_start: start byte of file (may be in the middle of a file for partitioned | |
166 # files). | |
167 # file_length: length of upload (may not be the entire length of a file for | |
168 # partitioned files). | |
169 # src_url: FileUrl describing the source file. | |
170 # dst_url: CloudUrl describing the destination component file. | |
171 # canned_acl: canned_acl to apply to the uploaded file/component. | |
172 # content_type: content-type for final object, used for setting content-type | |
173 # of components and final object. | |
174 # tracker_file: tracker file for this component. | |
175 # tracker_file_lock: tracker file lock for tracker file(s). | |
176 PerformParallelUploadFileToObjectArgs = namedtuple( | |
177 'PerformParallelUploadFileToObjectArgs', | |
178 'filename file_start file_length src_url dst_url canned_acl ' | |
179 'content_type tracker_file tracker_file_lock') | |
180 | |
181 ObjectFromTracker = namedtuple('ObjectFromTracker', | |
182 'object_name generation') | |
183 | |
184 # TODO: Refactor this file to be less cumbersome. In particular, some of the | |
185 # different paths (e.g., uploading a file to an object vs. downloading an | |
186 # object to a file) could be split into separate files. | |
187 | |
188 # Chunk size to use while zipping/unzipping gzip files. | |
189 GZIP_CHUNK_SIZE = 8192 | |
190 | |
191 PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024 | |
192 | |
193 # S3 requires special Multipart upload logic (that we currently don't implement) | |
194 # for files > 5GiB in size. | |
195 S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024 | |
196 | |
197 suggested_parallel_composites = False | |
198 | |
199 | |
200 class FileConcurrencySkipError(Exception): | |
201 """Raised when skipping a file due to a concurrent, duplicate copy.""" | |
202 | |
203 | |
204 def _RmExceptionHandler(cls, e): | |
205 """Simple exception handler to allow post-completion status.""" | |
206 cls.logger.error(str(e)) | |
207 | |
208 | |
209 def _ParallelUploadCopyExceptionHandler(cls, e): | |
210 """Simple exception handler to allow post-completion status.""" | |
211 cls.logger.error(str(e)) | |
212 cls.op_failure_count += 1 | |
213 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n', | |
214 traceback.format_exc()) | |
215 | |
216 | |
217 def _PerformParallelUploadFileToObject(cls, args, thread_state=None): | |
218 """Function argument to Apply for performing parallel composite uploads. | |
219 | |
220 Args: | |
221 cls: Calling Command class. | |
222 args: PerformParallelUploadFileToObjectArgs tuple describing the target. | |
223 thread_state: gsutil Cloud API instance to use for the operation. | |
224 | |
225 Returns: | |
226 StorageUrl representing a successfully uploaded component. | |
227 """ | |
228 fp = FilePart(args.filename, args.file_start, args.file_length) | |
229 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) | |
230 with fp: | |
231 # We take many precautions with the component names that make collisions | |
232 # effectively impossible. Specifying preconditions will just allow us to | |
233 # reach a state in which uploads will always fail on retries. | |
234 preconditions = None | |
235 | |
236 # Fill in content type if one was provided. | |
237 dst_object_metadata = apitools_messages.Object( | |
238 name=args.dst_url.object_name, | |
239 bucket=args.dst_url.bucket_name, | |
240 contentType=args.content_type) | |
241 | |
242 try: | |
243 if global_copy_helper_opts.canned_acl: | |
244 # No canned ACL support in JSON, force XML API to be used for | |
245 # upload/copy operations. | |
246 orig_prefer_api = gsutil_api.prefer_api | |
247 gsutil_api.prefer_api = ApiSelector.XML | |
248 ret = _UploadFileToObject(args.src_url, fp, args.file_length, | |
249 args.dst_url, dst_object_metadata, | |
250 preconditions, gsutil_api, cls.logger, cls, | |
251 _ParallelUploadCopyExceptionHandler, | |
252 gzip_exts=None, allow_splitting=False) | |
253 finally: | |
254 if global_copy_helper_opts.canned_acl: | |
255 gsutil_api.prefer_api = orig_prefer_api | |
256 | |
257 component = ret[2] | |
258 _AppendComponentTrackerToParallelUploadTrackerFile( | |
259 args.tracker_file, component, args.tracker_file_lock) | |
260 return ret | |
261 | |
262 | |
263 CopyHelperOpts = namedtuple('CopyHelperOpts', [ | |
264 'perform_mv', | |
265 'no_clobber', | |
266 'daisy_chain', | |
267 'read_args_from_stdin', | |
268 'print_ver', | |
269 'use_manifest', | |
270 'preserve_acl', | |
271 'canned_acl', | |
272 'skip_unsupported_objects', | |
273 'test_callback_file']) | |
274 | |
275 | |
276 # pylint: disable=global-variable-undefined | |
277 def CreateCopyHelperOpts(perform_mv=False, no_clobber=False, daisy_chain=False, | |
278 read_args_from_stdin=False, print_ver=False, | |
279 use_manifest=False, preserve_acl=False, | |
280 canned_acl=None, skip_unsupported_objects=False, | |
281 test_callback_file=None): | |
282 """Creates CopyHelperOpts for passing options to CopyHelper.""" | |
283 # We create a tuple with union of options needed by CopyHelper and any | |
284 # copy-related functionality in CpCommand, RsyncCommand, or Command class. | |
285 global global_copy_helper_opts | |
286 global_copy_helper_opts = CopyHelperOpts( | |
287 perform_mv=perform_mv, | |
288 no_clobber=no_clobber, | |
289 daisy_chain=daisy_chain, | |
290 read_args_from_stdin=read_args_from_stdin, | |
291 print_ver=print_ver, | |
292 use_manifest=use_manifest, | |
293 preserve_acl=preserve_acl, | |
294 canned_acl=canned_acl, | |
295 skip_unsupported_objects=skip_unsupported_objects, | |
296 test_callback_file=test_callback_file) | |
297 return global_copy_helper_opts | |
298 | |
299 | |
300 # pylint: disable=global-variable-undefined | |
301 # pylint: disable=global-variable-not-assigned | |
302 def GetCopyHelperOpts(): | |
303 """Returns namedtuple holding CopyHelper options.""" | |
304 global global_copy_helper_opts | |
305 return global_copy_helper_opts | |
306 | |
307 | |
308 def _SelectDownloadStrategy(dst_url): | |
309 """Get download strategy based on the destination object. | |
310 | |
311 Args: | |
312 dst_url: Destination StorageUrl. | |
313 | |
314 Returns: | |
315 gsutil Cloud API DownloadStrategy. | |
316 """ | |
317 dst_is_special = False | |
318 if dst_url.IsFileUrl(): | |
319 # Check explicitly first because os.stat doesn't work on 'nul' in Windows. | |
320 if dst_url.object_name == os.devnull: | |
321 dst_is_special = True | |
322 try: | |
323 mode = os.stat(dst_url.object_name).st_mode | |
324 if stat.S_ISCHR(mode): | |
325 dst_is_special = True | |
326 except OSError: | |
327 pass | |
328 | |
329 if dst_is_special: | |
330 return CloudApi.DownloadStrategy.ONE_SHOT | |
331 else: | |
332 return CloudApi.DownloadStrategy.RESUMABLE | |
333 | |
334 | |
335 def _GetUploadTrackerData(tracker_file_name, logger): | |
336 """Reads tracker data from an upload tracker file if it exists. | |
337 | |
338 Args: | |
339 tracker_file_name: Tracker file name for this upload. | |
340 logger: for outputting log messages. | |
341 | |
342 Returns: | |
343 Serialization data if the tracker file already exists (resume existing | |
344 upload), None otherwise. | |
345 """ | |
346 tracker_file = None | |
347 | |
348 # If we already have a matching tracker file, get the serialization data | |
349 # so that we can resume the upload. | |
350 try: | |
351 tracker_file = open(tracker_file_name, 'r') | |
352 tracker_data = tracker_file.read() | |
353 return tracker_data | |
354 except IOError as e: | |
355 # Ignore non-existent file (happens first time a upload is attempted on an | |
356 # object, or when re-starting an upload after a | |
357 # ResumableUploadStartOverException), but warn user for other errors. | |
358 if e.errno != errno.ENOENT: | |
359 logger.warn('Couldn\'t read upload tracker file (%s): %s. Restarting ' | |
360 'upload from scratch.', tracker_file_name, e.strerror) | |
361 finally: | |
362 if tracker_file: | |
363 tracker_file.close() | |
364 | |
365 | |
366 def InsistDstUrlNamesContainer(exp_dst_url, have_existing_dst_container, | |
367 command_name): | |
368 """Ensures the destination URL names a container. | |
369 | |
370 Acceptable containers include directory, bucket, bucket | |
371 subdir, and non-existent bucket subdir. | |
372 | |
373 Args: | |
374 exp_dst_url: Wildcard-expanded destination StorageUrl. | |
375 have_existing_dst_container: bool indicator of whether exp_dst_url | |
376 names a container (directory, bucket, or existing bucket subdir). | |
377 command_name: Name of command making call. May not be the same as the | |
378 calling class's self.command_name in the case of commands implemented | |
379 atop other commands (like mv command). | |
380 | |
381 Raises: | |
382 CommandException: if the URL being checked does not name a container. | |
383 """ | |
384 if ((exp_dst_url.IsFileUrl() and not exp_dst_url.IsDirectory()) or | |
385 (exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket() | |
386 and not have_existing_dst_container)): | |
387 raise CommandException('Destination URL must name a directory, bucket, ' | |
388 'or bucket\nsubdirectory for the multiple ' | |
389 'source form of the %s command.' % command_name) | |
390 | |
391 | |
392 def _ShouldTreatDstUrlAsBucketSubDir(have_multiple_srcs, dst_url, | |
393 have_existing_dest_subdir, | |
394 src_url_names_container, | |
395 recursion_requested): | |
396 """Checks whether dst_url should be treated as a bucket "sub-directory". | |
397 | |
398 The decision about whether something constitutes a bucket "sub-directory" | |
399 depends on whether there are multiple sources in this request and whether | |
400 there is an existing bucket subdirectory. For example, when running the | |
401 command: | |
402 gsutil cp file gs://bucket/abc | |
403 if there's no existing gs://bucket/abc bucket subdirectory we should copy | |
404 file to the object gs://bucket/abc. In contrast, if | |
405 there's an existing gs://bucket/abc bucket subdirectory we should copy | |
406 file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc | |
407 exists, when running the command: | |
408 gsutil cp file1 file2 gs://bucket/abc | |
409 we should copy file1 to gs://bucket/abc/file1 (and similarly for file2). | |
410 Finally, for recursive copies, if the source is a container then we should | |
411 copy to a container as the target. For example, when running the command: | |
412 gsutil cp -r dir1 gs://bucket/dir2 | |
413 we should copy the subtree of dir1 to gs://bucket/dir2. | |
414 | |
415 Note that we don't disallow naming a bucket "sub-directory" where there's | |
416 already an object at that URL. For example it's legitimate (albeit | |
417 confusing) to have an object called gs://bucket/dir and | |
418 then run the command | |
419 gsutil cp file1 file2 gs://bucket/dir | |
420 Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1, | |
421 and gs://bucket/dir/file2. | |
422 | |
423 Args: | |
424 have_multiple_srcs: Bool indicator of whether this is a multi-source | |
425 operation. | |
426 dst_url: StorageUrl to check. | |
427 have_existing_dest_subdir: bool indicator whether dest is an existing | |
428 subdirectory. | |
429 src_url_names_container: bool indicator of whether the source URL | |
430 is a container. | |
431 recursion_requested: True if a recursive operation has been requested. | |
432 | |
433 Returns: | |
434 bool indicator. | |
435 """ | |
436 if have_existing_dest_subdir: | |
437 return True | |
438 if dst_url.IsCloudUrl(): | |
439 return (have_multiple_srcs or | |
440 (src_url_names_container and recursion_requested)) | |
441 | |
442 | |
443 def _ShouldTreatDstUrlAsSingleton(have_multiple_srcs, | |
444 have_existing_dest_subdir, dst_url, | |
445 recursion_requested): | |
446 """Checks that dst_url names a single file/object after wildcard expansion. | |
447 | |
448 It is possible that an object path might name a bucket sub-directory. | |
449 | |
450 Args: | |
451 have_multiple_srcs: Bool indicator of whether this is a multi-source | |
452 operation. | |
453 have_existing_dest_subdir: bool indicator whether dest is an existing | |
454 subdirectory. | |
455 dst_url: StorageUrl to check. | |
456 recursion_requested: True if a recursive operation has been requested. | |
457 | |
458 Returns: | |
459 bool indicator. | |
460 """ | |
461 if recursion_requested: | |
462 return False | |
463 if dst_url.IsFileUrl(): | |
464 return not dst_url.IsDirectory() | |
465 else: # dst_url.IsCloudUrl() | |
466 return (not have_multiple_srcs and | |
467 not have_existing_dest_subdir and | |
468 dst_url.IsObject()) | |
469 | |
470 | |
471 def ConstructDstUrl(src_url, exp_src_url, src_url_names_container, | |
472 have_multiple_srcs, exp_dst_url, have_existing_dest_subdir, | |
473 recursion_requested): | |
474 """Constructs the destination URL for a given exp_src_url/exp_dst_url pair. | |
475 | |
476 Uses context-dependent naming rules that mimic Linux cp and mv behavior. | |
477 | |
478 Args: | |
479 src_url: Source StorageUrl to be copied. | |
480 exp_src_url: Single StorageUrl from wildcard expansion of src_url. | |
481 src_url_names_container: True if src_url names a container (including the | |
482 case of a wildcard-named bucket subdir (like gs://bucket/abc, | |
483 where gs://bucket/abc/* matched some objects). | |
484 have_multiple_srcs: True if this is a multi-source request. This can be | |
485 true if src_url wildcard-expanded to multiple URLs or if there were | |
486 multiple source URLs in the request. | |
487 exp_dst_url: the expanded StorageUrl requested for the cp destination. | |
488 Final written path is constructed from this plus a context-dependent | |
489 variant of src_url. | |
490 have_existing_dest_subdir: bool indicator whether dest is an existing | |
491 subdirectory. | |
492 recursion_requested: True if a recursive operation has been requested. | |
493 | |
494 Returns: | |
495 StorageUrl to use for copy. | |
496 | |
497 Raises: | |
498 CommandException if destination object name not specified for | |
499 source and source is a stream. | |
500 """ | |
501 if _ShouldTreatDstUrlAsSingleton( | |
502 have_multiple_srcs, have_existing_dest_subdir, exp_dst_url, | |
503 recursion_requested): | |
504 # We're copying one file or object to one file or object. | |
505 return exp_dst_url | |
506 | |
507 if exp_src_url.IsFileUrl() and exp_src_url.IsStream(): | |
508 if have_existing_dest_subdir: | |
509 raise CommandException('Destination object name needed when ' | |
510 'source is a stream') | |
511 return exp_dst_url | |
512 | |
513 if not recursion_requested and not have_multiple_srcs: | |
514 # We're copying one file or object to a subdirectory. Append final comp | |
515 # of exp_src_url to exp_dst_url. | |
516 src_final_comp = exp_src_url.object_name.rpartition(src_url.delim)[-1] | |
517 return StorageUrlFromString('%s%s%s' % ( | |
518 exp_dst_url.url_string.rstrip(exp_dst_url.delim), | |
519 exp_dst_url.delim, src_final_comp)) | |
520 | |
521 # Else we're copying multiple sources to a directory, bucket, or a bucket | |
522 # "sub-directory". | |
523 | |
524 # Ensure exp_dst_url ends in delim char if we're doing a multi-src copy or | |
525 # a copy to a directory. (The check for copying to a directory needs | |
526 # special-case handling so that the command: | |
527 # gsutil cp gs://bucket/obj dir | |
528 # will turn into file://dir/ instead of file://dir -- the latter would cause | |
529 # the file "dirobj" to be created.) | |
530 # Note: need to check have_multiple_srcs or src_url.names_container() | |
531 # because src_url could be a bucket containing a single object, named | |
532 # as gs://bucket. | |
533 if ((have_multiple_srcs or src_url_names_container or | |
534 (exp_dst_url.IsFileUrl() and exp_dst_url.IsDirectory())) | |
535 and not exp_dst_url.url_string.endswith(exp_dst_url.delim)): | |
536 exp_dst_url = StorageUrlFromString('%s%s' % (exp_dst_url.url_string, | |
537 exp_dst_url.delim)) | |
538 | |
539 # Making naming behavior match how things work with local Linux cp and mv | |
540 # operations depends on many factors, including whether the destination is a | |
541 # container, the plurality of the source(s), and whether the mv command is | |
542 # being used: | |
543 # 1. For the "mv" command that specifies a non-existent destination subdir, | |
544 # renaming should occur at the level of the src subdir, vs appending that | |
545 # subdir beneath the dst subdir like is done for copying. For example: | |
546 # gsutil rm -r gs://bucket | |
547 # gsutil cp -r dir1 gs://bucket | |
548 # gsutil cp -r dir2 gs://bucket/subdir1 | |
549 # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2 | |
550 # would (if using cp naming behavior) end up with paths like: | |
551 # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops | |
552 # whereas mv naming behavior should result in: | |
553 # gs://bucket/subdir2/dir2/.svn/all-wcprops | |
554 # 2. Copying from directories, buckets, or bucket subdirs should result in | |
555 # objects/files mirroring the source directory hierarchy. For example: | |
556 # gsutil cp dir1/dir2 gs://bucket | |
557 # should create the object gs://bucket/dir2/file2, assuming dir1/dir2 | |
558 # contains file2). | |
559 # To be consistent with Linux cp behavior, there's one more wrinkle when | |
560 # working with subdirs: The resulting object names depend on whether the | |
561 # destination subdirectory exists. For example, if gs://bucket/subdir | |
562 # exists, the command: | |
563 # gsutil cp -r dir1/dir2 gs://bucket/subdir | |
564 # should create objects named like gs://bucket/subdir/dir2/a/b/c. In | |
565 # contrast, if gs://bucket/subdir does not exist, this same command | |
566 # should create objects named like gs://bucket/subdir/a/b/c. | |
567 # 3. Copying individual files or objects to dirs, buckets or bucket subdirs | |
568 # should result in objects/files named by the final source file name | |
569 # component. Example: | |
570 # gsutil cp dir1/*.txt gs://bucket | |
571 # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt, | |
572 # assuming dir1 contains f1.txt and f2.txt. | |
573 | |
574 recursive_move_to_new_subdir = False | |
575 if (global_copy_helper_opts.perform_mv and recursion_requested | |
576 and src_url_names_container and not have_existing_dest_subdir): | |
577 # Case 1. Handle naming rules for bucket subdir mv. Here we want to | |
578 # line up the src_url against its expansion, to find the base to build | |
579 # the new name. For example, running the command: | |
580 # gsutil mv gs://bucket/abcd gs://bucket/xyz | |
581 # when processing exp_src_url=gs://bucket/abcd/123 | |
582 # exp_src_url_tail should become /123 | |
583 # Note: mv.py code disallows wildcard specification of source URL. | |
584 recursive_move_to_new_subdir = True | |
585 exp_src_url_tail = ( | |
586 exp_src_url.url_string[len(src_url.url_string):]) | |
587 dst_key_name = '%s/%s' % (exp_dst_url.object_name.rstrip('/'), | |
588 exp_src_url_tail.strip('/')) | |
589 | |
590 elif src_url_names_container and (exp_dst_url.IsCloudUrl() or | |
591 exp_dst_url.IsDirectory()): | |
592 # Case 2. Container copy to a destination other than a file. | |
593 # Build dst_key_name from subpath of exp_src_url past | |
594 # where src_url ends. For example, for src_url=gs://bucket/ and | |
595 # exp_src_url=gs://bucket/src_subdir/obj, dst_key_name should be | |
596 # src_subdir/obj. | |
597 src_url_path_sans_final_dir = GetPathBeforeFinalDir(src_url) | |
598 dst_key_name = exp_src_url.versionless_url_string[ | |
599 len(src_url_path_sans_final_dir):].lstrip(src_url.delim) | |
600 # Handle case where dst_url is a non-existent subdir. | |
601 if not have_existing_dest_subdir: | |
602 dst_key_name = dst_key_name.partition(src_url.delim)[-1] | |
603 # Handle special case where src_url was a directory named with '.' or | |
604 # './', so that running a command like: | |
605 # gsutil cp -r . gs://dest | |
606 # will produce obj names of the form gs://dest/abc instead of | |
607 # gs://dest/./abc. | |
608 if dst_key_name.startswith('.%s' % os.sep): | |
609 dst_key_name = dst_key_name[2:] | |
610 | |
611 else: | |
612 # Case 3. | |
613 dst_key_name = exp_src_url.object_name.rpartition(src_url.delim)[-1] | |
614 | |
615 if (not recursive_move_to_new_subdir and ( | |
616 exp_dst_url.IsFileUrl() or _ShouldTreatDstUrlAsBucketSubDir( | |
617 have_multiple_srcs, exp_dst_url, have_existing_dest_subdir, | |
618 src_url_names_container, recursion_requested))): | |
619 if exp_dst_url.object_name and exp_dst_url.object_name.endswith( | |
620 exp_dst_url.delim): | |
621 dst_key_name = '%s%s%s' % ( | |
622 exp_dst_url.object_name.rstrip(exp_dst_url.delim), | |
623 exp_dst_url.delim, dst_key_name) | |
624 else: | |
625 delim = exp_dst_url.delim if exp_dst_url.object_name else '' | |
626 dst_key_name = '%s%s%s' % (exp_dst_url.object_name or '', | |
627 delim, dst_key_name) | |
628 | |
629 new_exp_dst_url = exp_dst_url.Clone() | |
630 new_exp_dst_url.object_name = dst_key_name.replace(src_url.delim, | |
631 exp_dst_url.delim) | |
632 return new_exp_dst_url | |
633 | |
634 | |
635 def _CreateDigestsFromDigesters(digesters): | |
636 digests = {} | |
637 if digesters: | |
638 for alg in digesters: | |
639 digests[alg] = base64.encodestring( | |
640 digesters[alg].digest()).rstrip('\n') | |
641 return digests | |
642 | |
643 | |
644 def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata): | |
645 """Creates a base64 CRC32C and/or MD5 digest from file_name. | |
646 | |
647 Args: | |
648 logger: for outputting log messages. | |
649 algs: list of algorithms to compute. | |
650 file_name: file to digest. | |
651 src_obj_metadata: metadta of source object. | |
652 | |
653 Returns: | |
654 Dict of algorithm name : base 64 encoded digest | |
655 """ | |
656 hash_dict = {} | |
657 if 'md5' in algs: | |
658 if src_obj_metadata.size and src_obj_metadata.size > TEN_MIB: | |
659 logger.info( | |
660 'Computing MD5 for %s...', file_name) | |
661 hash_dict['md5'] = md5() | |
662 if 'crc32c' in algs: | |
663 hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c') | |
664 with open(file_name, 'rb') as fp: | |
665 CalculateHashesFromContents( | |
666 fp, hash_dict, ProgressCallbackWithBackoff( | |
667 src_obj_metadata.size, | |
668 FileProgressCallbackHandler( | |
669 ConstructAnnounceText('Hashing', file_name), logger).call)) | |
670 digests = {} | |
671 for alg_name, digest in hash_dict.iteritems(): | |
672 digests[alg_name] = Base64EncodeHash(digest.hexdigest()) | |
673 return digests | |
674 | |
675 | |
676 def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, | |
677 dst_obj_metadata): | |
678 """Validates integrity of two cloud objects copied via daisy-chain. | |
679 | |
680 Args: | |
681 logger: for outputting log messages. | |
682 src_url: CloudUrl for source cloud object. | |
683 dst_url: CloudUrl for destination cloud object. | |
684 src_obj_metadata: Cloud Object metadata for object being downloaded from. | |
685 dst_obj_metadata: Cloud Object metadata for object being uploaded to. | |
686 | |
687 Raises: | |
688 CommandException: if cloud digests don't match local digests. | |
689 """ | |
690 checked_one = False | |
691 download_hashes = {} | |
692 upload_hashes = {} | |
693 if src_obj_metadata.md5Hash: | |
694 download_hashes['md5'] = src_obj_metadata.md5Hash | |
695 if src_obj_metadata.crc32c: | |
696 download_hashes['crc32c'] = src_obj_metadata.crc32c | |
697 if dst_obj_metadata.md5Hash: | |
698 upload_hashes['md5'] = dst_obj_metadata.md5Hash | |
699 if dst_obj_metadata.crc32c: | |
700 upload_hashes['crc32c'] = dst_obj_metadata.crc32c | |
701 | |
702 for alg, upload_b64_digest in upload_hashes.iteritems(): | |
703 if alg not in download_hashes: | |
704 continue | |
705 | |
706 download_b64_digest = download_hashes[alg] | |
707 logger.debug( | |
708 'Comparing source vs destination %s-checksum for %s. (%s/%s)', alg, | |
709 dst_url, download_b64_digest, upload_b64_digest) | |
710 if download_b64_digest != upload_b64_digest: | |
711 raise HashMismatchException( | |
712 '%s signature for source object (%s) doesn\'t match ' | |
713 'destination object digest (%s). Object (%s) will be deleted.' % ( | |
714 alg, download_b64_digest, upload_b64_digest, dst_url)) | |
715 checked_one = True | |
716 if not checked_one: | |
717 # One known way this can currently happen is when downloading objects larger | |
718 # than 5 GiB from S3 (for which the etag is not an MD5). | |
719 logger.warn( | |
720 'WARNING: Found no hashes to validate object downloaded from %s and ' | |
721 'uploaded to %s. Integrity cannot be assured without hashes.', | |
722 src_url, dst_url) | |
723 | |
724 | |
725 def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests, | |
726 is_upload=False): | |
727 """Validates integrity by comparing cloud digest to local digest. | |
728 | |
729 Args: | |
730 logger: for outputting log messages. | |
731 obj_url: CloudUrl for cloud object. | |
732 obj_metadata: Cloud Object being downloaded from or uploaded to. | |
733 file_name: Local file name on disk being downloaded to or uploaded from. | |
734 digests: Computed Digests for the object. | |
735 is_upload: If true, comparing for an uploaded object (controls logging). | |
736 | |
737 Raises: | |
738 CommandException: if cloud digests don't match local digests. | |
739 """ | |
740 local_hashes = digests | |
741 cloud_hashes = {} | |
742 if obj_metadata.md5Hash: | |
743 cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n') | |
744 if obj_metadata.crc32c: | |
745 cloud_hashes['crc32c'] = obj_metadata.crc32c.rstrip('\n') | |
746 | |
747 checked_one = False | |
748 for alg in local_hashes: | |
749 if alg not in cloud_hashes: | |
750 continue | |
751 | |
752 local_b64_digest = local_hashes[alg] | |
753 cloud_b64_digest = cloud_hashes[alg] | |
754 logger.debug( | |
755 'Comparing local vs cloud %s-checksum for %s. (%s/%s)', alg, file_name, | |
756 local_b64_digest, cloud_b64_digest) | |
757 if local_b64_digest != cloud_b64_digest: | |
758 | |
759 raise HashMismatchException( | |
760 '%s signature computed for local file (%s) doesn\'t match ' | |
761 'cloud-supplied digest (%s). %s (%s) will be deleted.' % ( | |
762 alg, local_b64_digest, cloud_b64_digest, | |
763 'Cloud object' if is_upload else 'Local file', | |
764 obj_url if is_upload else file_name)) | |
765 checked_one = True | |
766 if not checked_one: | |
767 if is_upload: | |
768 logger.warn( | |
769 'WARNING: Found no hashes to validate object uploaded to %s. ' | |
770 'Integrity cannot be assured without hashes.', obj_url) | |
771 else: | |
772 # One known way this can currently happen is when downloading objects larger | |
773 # than 5 GB from S3 (for which the etag is not an MD5). | |
774 logger.warn( | |
775 'WARNING: Found no hashes to validate object downloaded to %s. ' | |
776 'Integrity cannot be assured without hashes.', file_name) | |
777 | |
778 | |
779 def IsNoClobberServerException(e): | |
780 """Checks to see if the server attempted to clobber a file. | |
781 | |
782 In this case we specified via a precondition that we didn't want the file | |
783 clobbered. | |
784 | |
785 Args: | |
786 e: The Exception that was generated by a failed copy operation | |
787 | |
788 Returns: | |
789 bool indicator - True indicates that the server did attempt to clobber | |
790 an existing file. | |
791 """ | |
792 return ((isinstance(e, PreconditionException)) or | |
793 (isinstance(e, ResumableUploadException) and '412' in e.message)) | |
794 | |
795 | |
796 def CheckForDirFileConflict(exp_src_url, dst_url): | |
797 """Checks whether copying exp_src_url into dst_url is not possible. | |
798 | |
799 This happens if a directory exists in local file system where a file | |
800 needs to go or vice versa. In that case we print an error message and | |
801 exits. Example: if the file "./x" exists and you try to do: | |
802 gsutil cp gs://mybucket/x/y . | |
803 the request can't succeed because it requires a directory where | |
804 the file x exists. | |
805 | |
806 Note that we don't enforce any corresponding restrictions for buckets, | |
807 because the flat namespace semantics for buckets doesn't prohibit such | |
808 cases the way hierarchical file systems do. For example, if a bucket | |
809 contains an object called gs://bucket/dir and then you run the command: | |
810 gsutil cp file1 file2 gs://bucket/dir | |
811 you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and | |
812 gs://bucket/dir/file2. | |
813 | |
814 Args: | |
815 exp_src_url: Expanded source StorageUrl. | |
816 dst_url: Destination StorageUrl. | |
817 | |
818 Raises: | |
819 CommandException: if errors encountered. | |
820 """ | |
821 if dst_url.IsCloudUrl(): | |
822 # The problem can only happen for file destination URLs. | |
823 return | |
824 dst_path = dst_url.object_name | |
825 final_dir = os.path.dirname(dst_path) | |
826 if os.path.isfile(final_dir): | |
827 raise CommandException('Cannot retrieve %s because a file exists ' | |
828 'where a directory needs to be created (%s).' % | |
829 (exp_src_url.url_string, final_dir)) | |
830 if os.path.isdir(dst_path): | |
831 raise CommandException('Cannot retrieve %s because a directory exists ' | |
832 '(%s) where the file needs to be created.' % | |
833 (exp_src_url.url_string, dst_path)) | |
834 | |
835 | |
836 def _PartitionFile(fp, file_size, src_url, content_type, canned_acl, | |
837 dst_bucket_url, random_prefix, tracker_file, | |
838 tracker_file_lock): | |
839 """Partitions a file into FilePart objects to be uploaded and later composed. | |
840 | |
841 These objects, when composed, will match the original file. This entails | |
842 splitting the file into parts, naming and forming a destination URL for each | |
843 part, and also providing the PerformParallelUploadFileToObjectArgs | |
844 corresponding to each part. | |
845 | |
846 Args: | |
847 fp: The file object to be partitioned. | |
848 file_size: The size of fp, in bytes. | |
849 src_url: Source FileUrl from the original command. | |
850 content_type: content type for the component and final objects. | |
851 canned_acl: The user-provided canned_acl, if applicable. | |
852 dst_bucket_url: CloudUrl for the destination bucket | |
853 random_prefix: The randomly-generated prefix used to prevent collisions | |
854 among the temporary component names. | |
855 tracker_file: The path to the parallel composite upload tracker file. | |
856 tracker_file_lock: The lock protecting access to the tracker file. | |
857 | |
858 Returns: | |
859 dst_args: The destination URIs for the temporary component objects. | |
860 """ | |
861 parallel_composite_upload_component_size = HumanReadableToBytes( | |
862 config.get('GSUtil', 'parallel_composite_upload_component_size', | |
863 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE)) | |
864 (num_components, component_size) = _GetPartitionInfo( | |
865 file_size, MAX_COMPOSE_ARITY, parallel_composite_upload_component_size) | |
866 | |
867 dst_args = {} # Arguments to create commands and pass to subprocesses. | |
868 file_names = [] # Used for the 2-step process of forming dst_args. | |
869 for i in range(num_components): | |
870 # "Salt" the object name with something a user is very unlikely to have | |
871 # used in an object name, then hash the extended name to make sure | |
872 # we don't run into problems with name length. Using a deterministic | |
873 # naming scheme for the temporary components allows users to take | |
874 # advantage of resumable uploads for each component. | |
875 encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode(UTF8) | |
876 content_md5 = md5() | |
877 content_md5.update(encoded_name) | |
878 digest = content_md5.hexdigest() | |
879 temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE + | |
880 digest + '_' + str(i)) | |
881 tmp_dst_url = dst_bucket_url.Clone() | |
882 tmp_dst_url.object_name = temp_file_name | |
883 | |
884 if i < (num_components - 1): | |
885 # Every component except possibly the last is the same size. | |
886 file_part_length = component_size | |
887 else: | |
888 # The last component just gets all of the remaining bytes. | |
889 file_part_length = (file_size - ((num_components -1) * component_size)) | |
890 offset = i * component_size | |
891 func_args = PerformParallelUploadFileToObjectArgs( | |
892 fp.name, offset, file_part_length, src_url, tmp_dst_url, canned_acl, | |
893 content_type, tracker_file, tracker_file_lock) | |
894 file_names.append(temp_file_name) | |
895 dst_args[temp_file_name] = func_args | |
896 | |
897 return dst_args | |
898 | |
899 | |
900 def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata, | |
901 canned_acl, file_size, preconditions, gsutil_api, | |
902 command_obj, copy_exception_handler): | |
903 """Uploads a local file to a cloud object using parallel composite upload. | |
904 | |
905 The file is partitioned into parts, and then the parts are uploaded in | |
906 parallel, composed to form the original destination object, and deleted. | |
907 | |
908 Args: | |
909 fp: The file object to be uploaded. | |
910 src_url: FileUrl representing the local file. | |
911 dst_url: CloudUrl representing the destination file. | |
912 dst_obj_metadata: apitools Object describing the destination object. | |
913 canned_acl: The canned acl to apply to the object, if any. | |
914 file_size: The size of the source file in bytes. | |
915 preconditions: Cloud API Preconditions for the final object. | |
916 gsutil_api: gsutil Cloud API instance to use. | |
917 command_obj: Command object (for calling Apply). | |
918 copy_exception_handler: Copy exception handler (for use in Apply). | |
919 | |
920 Returns: | |
921 Elapsed upload time, uploaded Object with generation, crc32c, and size | |
922 fields populated. | |
923 """ | |
924 start_time = time.time() | |
925 dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string) | |
926 api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme) | |
927 # Determine which components, if any, have already been successfully | |
928 # uploaded. | |
929 tracker_file = GetTrackerFilePath(dst_url, TrackerFileType.PARALLEL_UPLOAD, | |
930 api_selector, src_url) | |
931 tracker_file_lock = CreateLock() | |
932 (random_prefix, existing_components) = ( | |
933 _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock)) | |
934 | |
935 # Create the initial tracker file for the upload. | |
936 _CreateParallelUploadTrackerFile(tracker_file, random_prefix, | |
937 existing_components, tracker_file_lock) | |
938 | |
939 # Get the set of all components that should be uploaded. | |
940 dst_args = _PartitionFile( | |
941 fp, file_size, src_url, dst_obj_metadata.contentType, canned_acl, | |
942 dst_bucket_url, random_prefix, tracker_file, tracker_file_lock) | |
943 | |
944 (components_to_upload, existing_components, existing_objects_to_delete) = ( | |
945 FilterExistingComponents(dst_args, existing_components, dst_bucket_url, | |
946 gsutil_api)) | |
947 | |
948 # In parallel, copy all of the file parts that haven't already been | |
949 # uploaded to temporary objects. | |
950 cp_results = command_obj.Apply( | |
951 _PerformParallelUploadFileToObject, components_to_upload, | |
952 copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'), | |
953 arg_checker=gslib.command.DummyArgChecker, | |
954 parallel_operations_override=True, should_return_results=True) | |
955 uploaded_components = [] | |
956 for cp_result in cp_results: | |
957 uploaded_components.append(cp_result[2]) | |
958 components = uploaded_components + existing_components | |
959 | |
960 if len(components) == len(dst_args): | |
961 # Only try to compose if all of the components were uploaded successfully. | |
962 | |
963 def _GetComponentNumber(component): | |
964 return int(component.object_name[component.object_name.rfind('_')+1:]) | |
965 # Sort the components so that they will be composed in the correct order. | |
966 components = sorted(components, key=_GetComponentNumber) | |
967 | |
968 request_components = [] | |
969 for component_url in components: | |
970 src_obj_metadata = ( | |
971 apitools_messages.ComposeRequest.SourceObjectsValueListEntry( | |
972 name=component_url.object_name)) | |
973 if component_url.HasGeneration(): | |
974 src_obj_metadata.generation = long(component_url.generation) | |
975 request_components.append(src_obj_metadata) | |
976 | |
977 composed_object = gsutil_api.ComposeObject( | |
978 request_components, dst_obj_metadata, preconditions=preconditions, | |
979 provider=dst_url.scheme, fields=['generation', 'crc32c', 'size']) | |
980 | |
981 try: | |
982 # Make sure only to delete things that we know were successfully | |
983 # uploaded (as opposed to all of the objects that we attempted to | |
984 # create) so that we don't delete any preexisting objects, except for | |
985 # those that were uploaded by a previous, failed run and have since | |
986 # changed (but still have an old generation lying around). | |
987 objects_to_delete = components + existing_objects_to_delete | |
988 command_obj.Apply(_DeleteObjectFn, objects_to_delete, _RmExceptionHandler, | |
989 arg_checker=gslib.command.DummyArgChecker, | |
990 parallel_operations_override=True) | |
991 except Exception: # pylint: disable=broad-except | |
992 # If some of the delete calls fail, don't cause the whole command to | |
993 # fail. The copy was successful iff the compose call succeeded, so | |
994 # reduce this to a warning. | |
995 logging.warning( | |
996 'Failed to delete some of the following temporary objects:\n' + | |
997 '\n'.join(dst_args.keys())) | |
998 finally: | |
999 with tracker_file_lock: | |
1000 if os.path.exists(tracker_file): | |
1001 os.unlink(tracker_file) | |
1002 else: | |
1003 # Some of the components failed to upload. In this case, we want to exit | |
1004 # without deleting the objects. | |
1005 raise CommandException( | |
1006 'Some temporary components were not uploaded successfully. ' | |
1007 'Please retry this upload.') | |
1008 | |
1009 elapsed_time = time.time() - start_time | |
1010 return elapsed_time, composed_object | |
1011 | |
1012 | |
1013 def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url, | |
1014 file_size, canned_acl=None): | |
1015 """Determines whether parallel composite upload strategy should be used. | |
1016 | |
1017 Args: | |
1018 logger: for outputting log messages. | |
1019 allow_splitting: If false, then this function returns false. | |
1020 src_url: FileUrl corresponding to a local file. | |
1021 dst_url: CloudUrl corresponding to destination cloud object. | |
1022 file_size: The size of the source file, in bytes. | |
1023 canned_acl: Canned ACL to apply to destination object, if any. | |
1024 | |
1025 Returns: | |
1026 True iff a parallel upload should be performed on the source file. | |
1027 """ | |
1028 global suggested_parallel_composites | |
1029 parallel_composite_upload_threshold = HumanReadableToBytes(config.get( | |
1030 'GSUtil', 'parallel_composite_upload_threshold', | |
1031 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD)) | |
1032 | |
1033 all_factors_but_size = ( | |
1034 allow_splitting # Don't split the pieces multiple times. | |
1035 and not src_url.IsStream() # We can't partition streams. | |
1036 and dst_url.scheme == 'gs' # Compose is only for gs. | |
1037 and not canned_acl) # TODO: Implement canned ACL support for compose. | |
1038 | |
1039 # Since parallel composite uploads are disabled by default, make user aware of | |
1040 # them. | |
1041 # TODO: Once compiled crcmod is being distributed by major Linux distributions | |
1042 # remove this check. | |
1043 if (all_factors_but_size and parallel_composite_upload_threshold == 0 | |
1044 and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD | |
1045 and not suggested_parallel_composites): | |
1046 logger.info('\n'.join(textwrap.wrap( | |
1047 '==> NOTE: You are uploading one or more large file(s), which would ' | |
1048 'run significantly faster if you enable parallel composite uploads. ' | |
1049 'This feature can be enabled by editing the ' | |
1050 '"parallel_composite_upload_threshold" value in your .boto ' | |
1051 'configuration file. However, note that if you do this you and any ' | |
1052 'users that download such composite files will need to have a compiled ' | |
1053 'crcmod installed (see "gsutil help crcmod").')) + '\n') | |
1054 suggested_parallel_composites = True | |
1055 | |
1056 return (all_factors_but_size | |
1057 and parallel_composite_upload_threshold > 0 | |
1058 and file_size >= parallel_composite_upload_threshold) | |
1059 | |
1060 | |
1061 def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id, | |
1062 treat_nonexistent_object_as_subdir=False): | |
1063 """Expands wildcard if present in url_str. | |
1064 | |
1065 Args: | |
1066 url_str: String representation of requested url. | |
1067 gsutil_api: gsutil Cloud API instance to use. | |
1068 debug: debug level to use (for iterators). | |
1069 project_id: project ID to use (for iterators). | |
1070 treat_nonexistent_object_as_subdir: indicates if should treat a non-existent | |
1071 object as a subdir. | |
1072 | |
1073 Returns: | |
1074 (exp_url, have_existing_dst_container) | |
1075 where exp_url is a StorageUrl | |
1076 and have_existing_dst_container is a bool indicating whether | |
1077 exp_url names an existing directory, bucket, or bucket subdirectory. | |
1078 In the case where we match a subdirectory AND an object, the | |
1079 object is returned. | |
1080 | |
1081 Raises: | |
1082 CommandException: if url_str matched more than 1 URL. | |
1083 """ | |
1084 # Handle wildcarded url case. | |
1085 if ContainsWildcard(url_str): | |
1086 blr_expansion = list(CreateWildcardIterator(url_str, gsutil_api, | |
1087 debug=debug, | |
1088 project_id=project_id)) | |
1089 if len(blr_expansion) != 1: | |
1090 raise CommandException('Destination (%s) must match exactly 1 URL' % | |
1091 url_str) | |
1092 blr = blr_expansion[0] | |
1093 # BLR is either an OBJECT, PREFIX, or BUCKET; the latter two represent | |
1094 # directories. | |
1095 return (StorageUrlFromString(blr.url_string), not blr.IsObject()) | |
1096 | |
1097 storage_url = StorageUrlFromString(url_str) | |
1098 | |
1099 # Handle non-wildcarded URL. | |
1100 if storage_url.IsFileUrl(): | |
1101 return (storage_url, storage_url.IsDirectory()) | |
1102 | |
1103 # At this point we have a cloud URL. | |
1104 if storage_url.IsBucket(): | |
1105 return (storage_url, True) | |
1106 | |
1107 # For object/prefix URLs check 3 cases: (a) if the name ends with '/' treat | |
1108 # as a subdir; otherwise, use the wildcard iterator with url to | |
1109 # find if (b) there's a Prefix matching url, or (c) name is of form | |
1110 # dir_$folder$ (and in both these cases also treat dir as a subdir). | |
1111 # Cloud subdirs are always considered to be an existing container. | |
1112 if IsCloudSubdirPlaceholder(storage_url): | |
1113 return (storage_url, True) | |
1114 | |
1115 # Check for the special case where we have a folder marker object. | |
1116 folder_expansion = CreateWildcardIterator( | |
1117 storage_url.versionless_url_string + '_$folder$', gsutil_api, | |
1118 debug=debug, project_id=project_id).IterAll( | |
1119 bucket_listing_fields=['name']) | |
1120 for blr in folder_expansion: | |
1121 return (storage_url, True) | |
1122 | |
1123 blr_expansion = CreateWildcardIterator(url_str, gsutil_api, | |
1124 debug=debug, | |
1125 project_id=project_id).IterAll( | |
1126 bucket_listing_fields=['name']) | |
1127 expansion_empty = True | |
1128 for blr in blr_expansion: | |
1129 expansion_empty = False | |
1130 if blr.IsPrefix(): | |
1131 return (storage_url, True) | |
1132 | |
1133 return (storage_url, | |
1134 expansion_empty and treat_nonexistent_object_as_subdir) | |
1135 | |
1136 | |
1137 def FixWindowsNaming(src_url, dst_url): | |
1138 """Translates Windows pathnames to cloud pathnames. | |
1139 | |
1140 Rewrites the destination URL built by ConstructDstUrl(). | |
1141 | |
1142 Args: | |
1143 src_url: Source StorageUrl to be copied. | |
1144 dst_url: The destination StorageUrl built by ConstructDstUrl(). | |
1145 | |
1146 Returns: | |
1147 StorageUrl to use for copy. | |
1148 """ | |
1149 if (src_url.IsFileUrl() and src_url.delim == '\\' | |
1150 and dst_url.IsCloudUrl()): | |
1151 trans_url_str = re.sub(r'\\', '/', dst_url.url_string) | |
1152 dst_url = StorageUrlFromString(trans_url_str) | |
1153 return dst_url | |
1154 | |
1155 | |
1156 def SrcDstSame(src_url, dst_url): | |
1157 """Checks if src_url and dst_url represent the same object or file. | |
1158 | |
1159 We don't handle anything about hard or symbolic links. | |
1160 | |
1161 Args: | |
1162 src_url: Source StorageUrl. | |
1163 dst_url: Destination StorageUrl. | |
1164 | |
1165 Returns: | |
1166 Bool indicator. | |
1167 """ | |
1168 if src_url.IsFileUrl() and dst_url.IsFileUrl(): | |
1169 # Translate a/b/./c to a/b/c, so src=dst comparison below works. | |
1170 new_src_path = os.path.normpath(src_url.object_name) | |
1171 new_dst_path = os.path.normpath(dst_url.object_name) | |
1172 return new_src_path == new_dst_path | |
1173 else: | |
1174 return (src_url.url_string == dst_url.url_string and | |
1175 src_url.generation == dst_url.generation) | |
1176 | |
1177 | |
1178 def _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata): | |
1179 """Logs copy operation, including Content-Type if appropriate. | |
1180 | |
1181 Args: | |
1182 logger: logger instance to use for output. | |
1183 src_url: Source StorageUrl. | |
1184 dst_url: Destination StorageUrl. | |
1185 dst_obj_metadata: Object-specific metadata that should be overidden during | |
1186 the copy. | |
1187 """ | |
1188 if (dst_url.IsCloudUrl() and dst_obj_metadata and | |
1189 dst_obj_metadata.contentType): | |
1190 content_type_msg = ' [Content-Type=%s]' % dst_obj_metadata.contentType | |
1191 else: | |
1192 content_type_msg = '' | |
1193 if src_url.IsFileUrl() and src_url.IsStream(): | |
1194 logger.info('Copying from <STDIN>%s...', content_type_msg) | |
1195 else: | |
1196 logger.info('Copying %s%s...', src_url.url_string, content_type_msg) | |
1197 | |
1198 | |
1199 # pylint: disable=undefined-variable | |
1200 def _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, | |
1201 dst_obj_metadata, preconditions, gsutil_api, | |
1202 logger): | |
1203 """Performs copy-in-the cloud from specified src to dest object. | |
1204 | |
1205 Args: | |
1206 src_url: Source CloudUrl. | |
1207 src_obj_metadata: Metadata for source object; must include etag and size. | |
1208 dst_url: Destination CloudUrl. | |
1209 dst_obj_metadata: Object-specific metadata that should be overidden during | |
1210 the copy. | |
1211 preconditions: Preconditions to use for the copy. | |
1212 gsutil_api: gsutil Cloud API instance to use for the copy. | |
1213 logger: logging.Logger for log message output. | |
1214 | |
1215 Returns: | |
1216 (elapsed_time, bytes_transferred, dst_url with generation, | |
1217 md5 hash of destination) excluding overhead like initial GET. | |
1218 | |
1219 Raises: | |
1220 CommandException: if errors encountered. | |
1221 """ | |
1222 start_time = time.time() | |
1223 | |
1224 progress_callback = FileProgressCallbackHandler( | |
1225 ConstructAnnounceText('Copying', dst_url.url_string), logger).call | |
1226 if global_copy_helper_opts.test_callback_file: | |
1227 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: | |
1228 progress_callback = pickle.loads(test_fp.read()).call | |
1229 dst_obj = gsutil_api.CopyObject( | |
1230 src_obj_metadata, dst_obj_metadata, src_generation=src_url.generation, | |
1231 canned_acl=global_copy_helper_opts.canned_acl, | |
1232 preconditions=preconditions, progress_callback=progress_callback, | |
1233 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS) | |
1234 | |
1235 end_time = time.time() | |
1236 | |
1237 result_url = dst_url.Clone() | |
1238 result_url.generation = GenerationFromUrlAndString(result_url, | |
1239 dst_obj.generation) | |
1240 | |
1241 return (end_time - start_time, src_obj_metadata.size, result_url, | |
1242 dst_obj.md5Hash) | |
1243 | |
1244 | |
1245 def _CheckFreeSpace(path): | |
1246 """Return path/drive free space (in bytes).""" | |
1247 if IS_WINDOWS: | |
1248 # pylint: disable=g-import-not-at-top | |
1249 try: | |
1250 # pylint: disable=invalid-name | |
1251 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p, | |
1252 POINTER(c_uint64), | |
1253 POINTER(c_uint64), | |
1254 POINTER(c_uint64)) | |
1255 get_disk_free_space_ex = get_disk_free_space_ex( | |
1256 ('GetDiskFreeSpaceExW', windll.kernel32), ( | |
1257 (1, 'lpszPathName'), | |
1258 (2, 'lpFreeUserSpace'), | |
1259 (2, 'lpTotalSpace'), | |
1260 (2, 'lpFreeSpace'),)) | |
1261 except AttributeError: | |
1262 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p, | |
1263 POINTER(c_uint64), | |
1264 POINTER(c_uint64), | |
1265 POINTER(c_uint64)) | |
1266 get_disk_free_space_ex = get_disk_free_space_ex( | |
1267 ('GetDiskFreeSpaceExA', windll.kernel32), ( | |
1268 (1, 'lpszPathName'), | |
1269 (2, 'lpFreeUserSpace'), | |
1270 (2, 'lpTotalSpace'), | |
1271 (2, 'lpFreeSpace'),)) | |
1272 | |
1273 def GetDiskFreeSpaceExErrCheck(result, unused_func, args): | |
1274 if not result: | |
1275 raise WinError() | |
1276 return args[1].value | |
1277 get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck | |
1278 | |
1279 return get_disk_free_space_ex(os.getenv('SystemDrive')) | |
1280 else: | |
1281 (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path) | |
1282 return f_frsize * f_bavail | |
1283 | |
1284 | |
1285 def _SetContentTypeFromFile(src_url, dst_obj_metadata): | |
1286 """Detects and sets Content-Type if src_url names a local file. | |
1287 | |
1288 Args: | |
1289 src_url: Source StorageUrl. | |
1290 dst_obj_metadata: Object-specific metadata that should be overidden during | |
1291 the copy. | |
1292 """ | |
1293 # contentType == '' if user requested default type. | |
1294 if (dst_obj_metadata.contentType is None and src_url.IsFileUrl() | |
1295 and not src_url.IsStream()): | |
1296 # Only do content type recognition if src_url is a file. Object-to-object | |
1297 # copies with no -h Content-Type specified re-use the content type of the | |
1298 # source object. | |
1299 object_name = src_url.object_name | |
1300 content_type = None | |
1301 # Streams (denoted by '-') are expected to be 'application/octet-stream' | |
1302 # and 'file' would partially consume them. | |
1303 if object_name != '-': | |
1304 if config.getbool('GSUtil', 'use_magicfile', False): | |
1305 p = subprocess.Popen(['file', '--mime-type', object_name], | |
1306 stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
1307 output, error = p.communicate() | |
1308 p.stdout.close() | |
1309 p.stderr.close() | |
1310 if p.returncode != 0 or error: | |
1311 raise CommandException( | |
1312 'Encountered error running "file --mime-type %s" ' | |
1313 '(returncode=%d).\n%s' % (object_name, p.returncode, error)) | |
1314 # Parse output by removing line delimiter and splitting on last ": | |
1315 content_type = output.rstrip().rpartition(': ')[2] | |
1316 else: | |
1317 content_type = mimetypes.guess_type(object_name)[0] | |
1318 if not content_type: | |
1319 content_type = DEFAULT_CONTENT_TYPE | |
1320 dst_obj_metadata.contentType = content_type | |
1321 | |
1322 | |
1323 # pylint: disable=undefined-variable | |
1324 def _UploadFileToObjectNonResumable(src_url, src_obj_filestream, | |
1325 src_obj_size, dst_url, dst_obj_metadata, | |
1326 preconditions, gsutil_api, logger): | |
1327 """Uploads the file using a non-resumable strategy. | |
1328 | |
1329 Args: | |
1330 src_url: Source StorageUrl to upload. | |
1331 src_obj_filestream: File pointer to uploadable bytes. | |
1332 src_obj_size: Size of the source object. | |
1333 dst_url: Destination StorageUrl for the upload. | |
1334 dst_obj_metadata: Metadata for the target object. | |
1335 preconditions: Preconditions for the upload, if any. | |
1336 gsutil_api: gsutil Cloud API instance to use for the upload. | |
1337 logger: For outputting log messages. | |
1338 | |
1339 Returns: | |
1340 Elapsed upload time, uploaded Object with generation, md5, and size fields | |
1341 populated. | |
1342 """ | |
1343 progress_callback = FileProgressCallbackHandler( | |
1344 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call | |
1345 if global_copy_helper_opts.test_callback_file: | |
1346 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: | |
1347 progress_callback = pickle.loads(test_fp.read()).call | |
1348 start_time = time.time() | |
1349 | |
1350 if src_url.IsStream(): | |
1351 # TODO: gsutil-beta: Provide progress callbacks for streaming uploads. | |
1352 uploaded_object = gsutil_api.UploadObjectStreaming( | |
1353 src_obj_filestream, object_metadata=dst_obj_metadata, | |
1354 canned_acl=global_copy_helper_opts.canned_acl, | |
1355 preconditions=preconditions, progress_callback=progress_callback, | |
1356 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS) | |
1357 else: | |
1358 uploaded_object = gsutil_api.UploadObject( | |
1359 src_obj_filestream, object_metadata=dst_obj_metadata, | |
1360 canned_acl=global_copy_helper_opts.canned_acl, size=src_obj_size, | |
1361 preconditions=preconditions, progress_callback=progress_callback, | |
1362 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS) | |
1363 end_time = time.time() | |
1364 elapsed_time = end_time - start_time | |
1365 | |
1366 return elapsed_time, uploaded_object | |
1367 | |
1368 | |
1369 # pylint: disable=undefined-variable | |
1370 def _UploadFileToObjectResumable(src_url, src_obj_filestream, | |
1371 src_obj_size, dst_url, dst_obj_metadata, | |
1372 preconditions, gsutil_api, logger): | |
1373 """Uploads the file using a resumable strategy. | |
1374 | |
1375 Args: | |
1376 src_url: Source FileUrl to upload. Must not be a stream. | |
1377 src_obj_filestream: File pointer to uploadable bytes. | |
1378 src_obj_size: Size of the source object. | |
1379 dst_url: Destination StorageUrl for the upload. | |
1380 dst_obj_metadata: Metadata for the target object. | |
1381 preconditions: Preconditions for the upload, if any. | |
1382 gsutil_api: gsutil Cloud API instance to use for the upload. | |
1383 logger: for outputting log messages. | |
1384 | |
1385 Returns: | |
1386 Elapsed upload time, uploaded Object with generation, md5, and size fields | |
1387 populated. | |
1388 """ | |
1389 tracker_file_name = GetTrackerFilePath( | |
1390 dst_url, TrackerFileType.UPLOAD, | |
1391 gsutil_api.GetApiSelector(provider=dst_url.scheme)) | |
1392 | |
1393 def _UploadTrackerCallback(serialization_data): | |
1394 """Creates a new tracker file for starting an upload from scratch. | |
1395 | |
1396 This function is called by the gsutil Cloud API implementation and the | |
1397 the serialization data is implementation-specific. | |
1398 | |
1399 Args: | |
1400 serialization_data: Serialization data used in resuming the upload. | |
1401 """ | |
1402 tracker_file = None | |
1403 try: | |
1404 tracker_file = open(tracker_file_name, 'w') | |
1405 tracker_file.write(str(serialization_data)) | |
1406 except IOError as e: | |
1407 RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror) | |
1408 finally: | |
1409 if tracker_file: | |
1410 tracker_file.close() | |
1411 | |
1412 # This contains the upload URL, which will uniquely identify the | |
1413 # destination object. | |
1414 tracker_data = _GetUploadTrackerData(tracker_file_name, logger) | |
1415 if tracker_data: | |
1416 logger.info( | |
1417 'Resuming upload for %s', src_url.url_string) | |
1418 | |
1419 retryable = True | |
1420 | |
1421 progress_callback = FileProgressCallbackHandler( | |
1422 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call | |
1423 if global_copy_helper_opts.test_callback_file: | |
1424 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: | |
1425 progress_callback = pickle.loads(test_fp.read()).call | |
1426 | |
1427 start_time = time.time() | |
1428 num_startover_attempts = 0 | |
1429 # This loop causes us to retry when the resumable upload failed in a way that | |
1430 # requires starting over with a new upload ID. Retries within a single upload | |
1431 # ID within the current process are handled in | |
1432 # gsutil_api.UploadObjectResumable, and retries within a single upload ID | |
1433 # spanning processes happens if an exception occurs not caught below (which | |
1434 # will leave the tracker file in place, and cause the upload ID to be reused | |
1435 # the next time the user runs gsutil and attempts the same upload). | |
1436 while retryable: | |
1437 try: | |
1438 uploaded_object = gsutil_api.UploadObjectResumable( | |
1439 src_obj_filestream, object_metadata=dst_obj_metadata, | |
1440 canned_acl=global_copy_helper_opts.canned_acl, | |
1441 preconditions=preconditions, provider=dst_url.scheme, | |
1442 size=src_obj_size, serialization_data=tracker_data, | |
1443 fields=UPLOAD_RETURN_FIELDS, | |
1444 tracker_callback=_UploadTrackerCallback, | |
1445 progress_callback=progress_callback) | |
1446 retryable = False | |
1447 except ResumableUploadStartOverException, e: | |
1448 # This can happen, for example, if the server sends a 410 response code. | |
1449 # In that case the current resumable upload ID can't be reused, so delete | |
1450 # the tracker file and try again up to max retries. | |
1451 num_startover_attempts += 1 | |
1452 retryable = (num_startover_attempts < GetNumRetries()) | |
1453 if not retryable: | |
1454 raise | |
1455 | |
1456 # If the server sends a 404 response code, then the upload should only | |
1457 # be restarted if it was the object (and not the bucket) that was missing. | |
1458 try: | |
1459 gsutil_api.GetBucket(dst_obj_metadata.bucket, provider=dst_url.scheme) | |
1460 except NotFoundException: | |
1461 raise | |
1462 | |
1463 logger.info('Restarting upload from scratch after exception %s', e) | |
1464 DeleteTrackerFile(tracker_file_name) | |
1465 tracker_data = None | |
1466 src_obj_filestream.seek(0) | |
1467 # Reset the progress callback handler. | |
1468 progress_callback = FileProgressCallbackHandler( | |
1469 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call | |
1470 logger.info('\n'.join(textwrap.wrap( | |
1471 'Resumable upload of %s failed with a response code indicating we ' | |
1472 'need to start over with a new resumable upload ID. Backing off ' | |
1473 'and retrying.' % src_url.url_string))) | |
1474 time.sleep(min(random.random() * (2 ** num_startover_attempts), | |
1475 GetMaxRetryDelay())) | |
1476 except ResumableUploadAbortException: | |
1477 retryable = False | |
1478 raise | |
1479 finally: | |
1480 if not retryable: | |
1481 DeleteTrackerFile(tracker_file_name) | |
1482 | |
1483 end_time = time.time() | |
1484 elapsed_time = end_time - start_time | |
1485 | |
1486 return (elapsed_time, uploaded_object) | |
1487 | |
1488 | |
1489 def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger): | |
1490 """Compresses a to-be-uploaded local file to save bandwidth. | |
1491 | |
1492 Args: | |
1493 src_url: Source FileUrl. | |
1494 src_obj_filestream: Read stream of the source file - will be consumed | |
1495 and closed. | |
1496 src_obj_size: Size of the source file. | |
1497 logger: for outputting log messages. | |
1498 | |
1499 Returns: | |
1500 StorageUrl path to compressed file, compressed file size. | |
1501 """ | |
1502 # TODO: Compress using a streaming model as opposed to all at once here. | |
1503 if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING: | |
1504 logger.info( | |
1505 'Compressing %s (to tmp)...', src_url) | |
1506 (gzip_fh, gzip_path) = tempfile.mkstemp() | |
1507 gzip_fp = None | |
1508 try: | |
1509 # Check for temp space. Assume the compressed object is at most 2x | |
1510 # the size of the object (normally should compress to smaller than | |
1511 # the object) | |
1512 if _CheckFreeSpace(gzip_path) < 2*int(src_obj_size): | |
1513 raise CommandException('Inadequate temp space available to compress ' | |
1514 '%s. See the CHANGING TEMP DIRECTORIES section ' | |
1515 'of "gsutil help cp" for more info.' % src_url) | |
1516 gzip_fp = gzip.open(gzip_path, 'wb') | |
1517 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) | |
1518 while data: | |
1519 gzip_fp.write(data) | |
1520 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) | |
1521 finally: | |
1522 if gzip_fp: | |
1523 gzip_fp.close() | |
1524 os.close(gzip_fh) | |
1525 src_obj_filestream.close() | |
1526 gzip_size = os.path.getsize(gzip_path) | |
1527 return StorageUrlFromString(gzip_path), gzip_size | |
1528 | |
1529 | |
1530 def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size, | |
1531 dst_url, dst_obj_metadata, preconditions, gsutil_api, | |
1532 logger, command_obj, copy_exception_handler, | |
1533 gzip_exts=None, allow_splitting=True): | |
1534 """Uploads a local file to an object. | |
1535 | |
1536 Args: | |
1537 src_url: Source FileUrl. | |
1538 src_obj_filestream: Read stream of the source file to be read and closed. | |
1539 src_obj_size: Size of the source file. | |
1540 dst_url: Destination CloudUrl. | |
1541 dst_obj_metadata: Metadata to be applied to the destination object. | |
1542 preconditions: Preconditions to use for the copy. | |
1543 gsutil_api: gsutil Cloud API to use for the copy. | |
1544 logger: for outputting log messages. | |
1545 command_obj: command object for use in Apply in parallel composite uploads. | |
1546 copy_exception_handler: For handling copy exceptions during Apply. | |
1547 gzip_exts: List of file extensions to gzip prior to upload, if any. | |
1548 allow_splitting: Whether to allow the file to be split into component | |
1549 pieces for an parallel composite upload. | |
1550 | |
1551 Returns: | |
1552 (elapsed_time, bytes_transferred, dst_url with generation, | |
1553 md5 hash of destination) excluding overhead like initial GET. | |
1554 | |
1555 Raises: | |
1556 CommandException: if errors encountered. | |
1557 """ | |
1558 if not dst_obj_metadata or not dst_obj_metadata.contentLanguage: | |
1559 content_language = config.get_value('GSUtil', 'content_language') | |
1560 if content_language: | |
1561 dst_obj_metadata.contentLanguage = content_language | |
1562 | |
1563 fname_parts = src_url.object_name.split('.') | |
1564 upload_url = src_url | |
1565 upload_stream = src_obj_filestream | |
1566 upload_size = src_obj_size | |
1567 zipped_file = False | |
1568 if gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts: | |
1569 upload_url, upload_size = _CompressFileForUpload( | |
1570 src_url, src_obj_filestream, src_obj_size, logger) | |
1571 upload_stream = open(upload_url.object_name, 'rb') | |
1572 dst_obj_metadata.contentEncoding = 'gzip' | |
1573 zipped_file = True | |
1574 | |
1575 elapsed_time = None | |
1576 uploaded_object = None | |
1577 hash_algs = GetUploadHashAlgs() | |
1578 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) | |
1579 | |
1580 parallel_composite_upload = _ShouldDoParallelCompositeUpload( | |
1581 logger, allow_splitting, upload_url, dst_url, src_obj_size, | |
1582 canned_acl=global_copy_helper_opts.canned_acl) | |
1583 | |
1584 if (src_url.IsStream() and | |
1585 gsutil_api.GetApiSelector(provider=dst_url.scheme) == ApiSelector.JSON): | |
1586 orig_stream = upload_stream | |
1587 # Add limited seekable properties to the stream via buffering. | |
1588 upload_stream = ResumableStreamingJsonUploadWrapper( | |
1589 orig_stream, GetJsonResumableChunkSize()) | |
1590 | |
1591 if not parallel_composite_upload and len(hash_algs): | |
1592 # Parallel composite uploads calculate hashes per-component in subsequent | |
1593 # calls to this function, but the composition of the final object is a | |
1594 # cloud-only operation. | |
1595 wrapped_filestream = HashingFileUploadWrapper(upload_stream, digesters, | |
1596 hash_algs, upload_url, logger) | |
1597 else: | |
1598 wrapped_filestream = upload_stream | |
1599 | |
1600 try: | |
1601 if parallel_composite_upload: | |
1602 elapsed_time, uploaded_object = _DoParallelCompositeUpload( | |
1603 upload_stream, upload_url, dst_url, dst_obj_metadata, | |
1604 global_copy_helper_opts.canned_acl, upload_size, preconditions, | |
1605 gsutil_api, command_obj, copy_exception_handler) | |
1606 elif upload_size < ResumableThreshold() or src_url.IsStream(): | |
1607 elapsed_time, uploaded_object = _UploadFileToObjectNonResumable( | |
1608 upload_url, wrapped_filestream, upload_size, dst_url, | |
1609 dst_obj_metadata, preconditions, gsutil_api, logger) | |
1610 else: | |
1611 elapsed_time, uploaded_object = _UploadFileToObjectResumable( | |
1612 upload_url, wrapped_filestream, upload_size, dst_url, | |
1613 dst_obj_metadata, preconditions, gsutil_api, logger) | |
1614 | |
1615 finally: | |
1616 if zipped_file: | |
1617 try: | |
1618 os.unlink(upload_url.object_name) | |
1619 # Windows sometimes complains the temp file is locked when you try to | |
1620 # delete it. | |
1621 except Exception: # pylint: disable=broad-except | |
1622 logger.warning( | |
1623 'Could not delete %s. This can occur in Windows because the ' | |
1624 'temporary file is still locked.', upload_url.object_name) | |
1625 # In the gzip case, this is the gzip stream. _CompressFileForUpload will | |
1626 # have already closed the original source stream. | |
1627 upload_stream.close() | |
1628 | |
1629 if not parallel_composite_upload: | |
1630 try: | |
1631 digests = _CreateDigestsFromDigesters(digesters) | |
1632 _CheckHashes(logger, dst_url, uploaded_object, src_url.object_name, | |
1633 digests, is_upload=True) | |
1634 except HashMismatchException: | |
1635 if _RENAME_ON_HASH_MISMATCH: | |
1636 corrupted_obj_metadata = apitools_messages.Object( | |
1637 name=dst_obj_metadata.name, | |
1638 bucket=dst_obj_metadata.bucket, | |
1639 etag=uploaded_object.etag) | |
1640 dst_obj_metadata.name = (dst_url.object_name + | |
1641 _RENAME_ON_HASH_MISMATCH_SUFFIX) | |
1642 gsutil_api.CopyObject(corrupted_obj_metadata, | |
1643 dst_obj_metadata, provider=dst_url.scheme) | |
1644 # If the digest doesn't match, delete the object. | |
1645 gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name, | |
1646 generation=uploaded_object.generation, | |
1647 provider=dst_url.scheme) | |
1648 raise | |
1649 | |
1650 result_url = dst_url.Clone() | |
1651 | |
1652 result_url.generation = uploaded_object.generation | |
1653 result_url.generation = GenerationFromUrlAndString( | |
1654 result_url, uploaded_object.generation) | |
1655 | |
1656 return (elapsed_time, uploaded_object.size, result_url, | |
1657 uploaded_object.md5Hash) | |
1658 | |
1659 | |
1660 # TODO: Refactor this long function into smaller pieces. | |
1661 # pylint: disable=too-many-statements | |
1662 def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, | |
1663 gsutil_api, logger, test_method=None): | |
1664 """Downloads an object to a local file. | |
1665 | |
1666 Args: | |
1667 src_url: Source CloudUrl. | |
1668 src_obj_metadata: Metadata from the source object. | |
1669 dst_url: Destination FileUrl. | |
1670 gsutil_api: gsutil Cloud API instance to use for the download. | |
1671 logger: for outputting log messages. | |
1672 test_method: Optional test method for modifying the file before validation | |
1673 during unit tests. | |
1674 Returns: | |
1675 (elapsed_time, bytes_transferred, dst_url, md5), excluding overhead like | |
1676 initial GET. | |
1677 | |
1678 Raises: | |
1679 CommandException: if errors encountered. | |
1680 """ | |
1681 global open_files_map | |
1682 file_name = dst_url.object_name | |
1683 dir_name = os.path.dirname(file_name) | |
1684 if dir_name and not os.path.exists(dir_name): | |
1685 # Do dir creation in try block so can ignore case where dir already | |
1686 # exists. This is needed to avoid a race condition when running gsutil | |
1687 # -m cp. | |
1688 try: | |
1689 os.makedirs(dir_name) | |
1690 except OSError, e: | |
1691 if e.errno != errno.EEXIST: | |
1692 raise | |
1693 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) | |
1694 # For gzipped objects download to a temp file and unzip. For the XML API, | |
1695 # the represents the result of a HEAD request. For the JSON API, this is | |
1696 # the stored encoding which the service may not respect. However, if the | |
1697 # server sends decompressed bytes for a file that is stored compressed | |
1698 # (double compressed case), there is no way we can validate the hash and | |
1699 # we will fail our hash check for the object. | |
1700 if (src_obj_metadata.contentEncoding and | |
1701 src_obj_metadata.contentEncoding.lower().endswith('gzip')): | |
1702 # We can't use tempfile.mkstemp() here because we need a predictable | |
1703 # filename for resumable downloads. | |
1704 download_file_name = _GetDownloadZipFileName(file_name) | |
1705 logger.info( | |
1706 'Downloading to temp gzip filename %s', download_file_name) | |
1707 need_to_unzip = True | |
1708 else: | |
1709 download_file_name = file_name | |
1710 need_to_unzip = False | |
1711 | |
1712 if download_file_name.endswith(dst_url.delim): | |
1713 logger.warn('\n'.join(textwrap.wrap( | |
1714 'Skipping attempt to download to filename ending with slash (%s). This ' | |
1715 'typically happens when using gsutil to download from a subdirectory ' | |
1716 'created by the Cloud Console (https://cloud.google.com/console)' | |
1717 % download_file_name))) | |
1718 return (0, 0, dst_url, '') | |
1719 | |
1720 # Set up hash digesters. | |
1721 hash_algs = GetDownloadHashAlgs( | |
1722 logger, src_has_md5=src_obj_metadata.md5Hash, | |
1723 src_has_crc32c=src_obj_metadata.crc32c) | |
1724 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) | |
1725 | |
1726 fp = None | |
1727 # Tracks whether the server used a gzip encoding. | |
1728 server_encoding = None | |
1729 download_complete = False | |
1730 download_strategy = _SelectDownloadStrategy(dst_url) | |
1731 download_start_point = 0 | |
1732 # This is used for resuming downloads, but also for passing the mediaLink | |
1733 # and size into the download for new downloads so that we can avoid | |
1734 # making an extra HTTP call. | |
1735 serialization_data = None | |
1736 serialization_dict = GetDownloadSerializationDict(src_obj_metadata) | |
1737 open_files = [] | |
1738 try: | |
1739 if download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: | |
1740 fp = open(download_file_name, 'wb') | |
1741 elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE: | |
1742 # If this is a resumable download, we need to open the file for append and | |
1743 # manage a tracker file. | |
1744 if open_files_map.get(download_file_name, False): | |
1745 # Ensure another process/thread is not already writing to this file. | |
1746 raise FileConcurrencySkipError | |
1747 open_files.append(download_file_name) | |
1748 open_files_map[download_file_name] = True | |
1749 fp = open(download_file_name, 'ab') | |
1750 | |
1751 resuming = ReadOrCreateDownloadTrackerFile( | |
1752 src_obj_metadata, dst_url, api_selector) | |
1753 if resuming: | |
1754 # Find out how far along we are so we can request the appropriate | |
1755 # remaining range of the object. | |
1756 existing_file_size = GetFileSize(fp, position_to_eof=True) | |
1757 if existing_file_size > src_obj_metadata.size: | |
1758 DeleteTrackerFile(GetTrackerFilePath( | |
1759 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | |
1760 raise CommandException( | |
1761 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' | |
1762 'if you re-try this download it will start from scratch' % | |
1763 (download_file_name, existing_file_size, src_url.object_name, | |
1764 src_obj_metadata.size)) | |
1765 else: | |
1766 if existing_file_size == src_obj_metadata.size: | |
1767 logger.info( | |
1768 'Download already complete for file %s, skipping download but ' | |
1769 'will run integrity checks.', download_file_name) | |
1770 download_complete = True | |
1771 else: | |
1772 download_start_point = existing_file_size | |
1773 serialization_dict['progress'] = download_start_point | |
1774 logger.info('Resuming download for %s', src_url.url_string) | |
1775 # Catch up our digester with the hash data. | |
1776 if existing_file_size > TEN_MIB: | |
1777 for alg_name in digesters: | |
1778 logger.info( | |
1779 'Catching up %s for %s', alg_name, download_file_name) | |
1780 with open(download_file_name, 'rb') as hash_fp: | |
1781 while True: | |
1782 data = hash_fp.read(DEFAULT_FILE_BUFFER_SIZE) | |
1783 if not data: | |
1784 break | |
1785 for alg_name in digesters: | |
1786 digesters[alg_name].update(data) | |
1787 else: | |
1788 # Starting a new download, blow away whatever is already there. | |
1789 fp.truncate(0) | |
1790 fp.seek(0) | |
1791 | |
1792 else: | |
1793 raise CommandException('Invalid download strategy %s chosen for' | |
1794 'file %s' % (download_strategy, fp.name)) | |
1795 | |
1796 if not dst_url.IsStream(): | |
1797 serialization_data = json.dumps(serialization_dict) | |
1798 | |
1799 progress_callback = FileProgressCallbackHandler( | |
1800 ConstructAnnounceText('Downloading', dst_url.url_string), | |
1801 logger).call | |
1802 if global_copy_helper_opts.test_callback_file: | |
1803 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: | |
1804 progress_callback = pickle.loads(test_fp.read()).call | |
1805 | |
1806 start_time = time.time() | |
1807 # TODO: With gzip encoding (which may occur on-the-fly and not be part of | |
1808 # the object's metadata), when we request a range to resume, it's possible | |
1809 # that the server will just resend the entire object, which means our | |
1810 # caught-up hash will be incorrect. We recalculate the hash on | |
1811 # the local file in the case of a failed gzip hash anyway, but it would | |
1812 # be better if we actively detected this case. | |
1813 if not download_complete: | |
1814 server_encoding = gsutil_api.GetObjectMedia( | |
1815 src_url.bucket_name, src_url.object_name, fp, | |
1816 start_byte=download_start_point, generation=src_url.generation, | |
1817 object_size=src_obj_metadata.size, | |
1818 download_strategy=download_strategy, provider=src_url.scheme, | |
1819 serialization_data=serialization_data, digesters=digesters, | |
1820 progress_callback=progress_callback) | |
1821 | |
1822 end_time = time.time() | |
1823 | |
1824 # If a custom test method is defined, call it here. For the copy command, | |
1825 # test methods are expected to take one argument: an open file pointer, | |
1826 # and are used to perturb the open file during download to exercise | |
1827 # download error detection. | |
1828 if test_method: | |
1829 test_method(fp) | |
1830 | |
1831 except ResumableDownloadException as e: | |
1832 logger.warning('Caught ResumableDownloadException (%s) for file %s.', | |
1833 e.reason, file_name) | |
1834 raise | |
1835 | |
1836 finally: | |
1837 if fp: | |
1838 fp.close() | |
1839 for file_name in open_files: | |
1840 open_files_map.delete(file_name) | |
1841 | |
1842 # If we decompressed a content-encoding gzip file on the fly, this may not | |
1843 # be accurate, but it is the best we can do without going deep into the | |
1844 # underlying HTTP libraries. Note that this value is only used for | |
1845 # reporting in log messages; inaccuracy doesn't impact the integrity of the | |
1846 # download. | |
1847 bytes_transferred = src_obj_metadata.size - download_start_point | |
1848 server_gzip = server_encoding and server_encoding.lower().endswith('gzip') | |
1849 local_md5 = _ValidateDownloadHashes(logger, src_url, src_obj_metadata, | |
1850 dst_url, need_to_unzip, server_gzip, | |
1851 digesters, hash_algs, api_selector, | |
1852 bytes_transferred) | |
1853 | |
1854 return (end_time - start_time, bytes_transferred, dst_url, local_md5) | |
1855 | |
1856 | |
1857 def _GetDownloadZipFileName(file_name): | |
1858 """Returns the file name for a temporarily compressed downloaded file.""" | |
1859 return '%s_.gztmp' % file_name | |
1860 | |
1861 | |
1862 def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, | |
1863 need_to_unzip, server_gzip, digesters, hash_algs, | |
1864 api_selector, bytes_transferred): | |
1865 """Validates a downloaded file's integrity. | |
1866 | |
1867 Args: | |
1868 logger: For outputting log messages. | |
1869 src_url: StorageUrl for the source object. | |
1870 src_obj_metadata: Metadata for the source object, potentially containing | |
1871 hash values. | |
1872 dst_url: StorageUrl describing the destination file. | |
1873 need_to_unzip: If true, a temporary zip file was used and must be | |
1874 uncompressed as part of validation. | |
1875 server_gzip: If true, the server gzipped the bytes (regardless of whether | |
1876 the object metadata claimed it was gzipped). | |
1877 digesters: dict of {string, hash digester} that contains up-to-date digests | |
1878 computed during the download. If a digester for a particular | |
1879 algorithm is None, an up-to-date digest is not available and the | |
1880 hash must be recomputed from the local file. | |
1881 hash_algs: dict of {string, hash algorithm} that can be used if digesters | |
1882 don't have up-to-date digests. | |
1883 api_selector: The Cloud API implementation used (used tracker file naming). | |
1884 bytes_transferred: Number of bytes downloaded (used for logging). | |
1885 | |
1886 Returns: | |
1887 An MD5 of the local file, if one was calculated as part of the integrity | |
1888 check. | |
1889 """ | |
1890 file_name = dst_url.object_name | |
1891 download_file_name = (_GetDownloadZipFileName(file_name) if need_to_unzip else | |
1892 file_name) | |
1893 digesters_succeeded = True | |
1894 for alg in digesters: | |
1895 # If we get a digester with a None algorithm, the underlying | |
1896 # implementation failed to calculate a digest, so we will need to | |
1897 # calculate one from scratch. | |
1898 if not digesters[alg]: | |
1899 digesters_succeeded = False | |
1900 break | |
1901 | |
1902 if digesters_succeeded: | |
1903 local_hashes = _CreateDigestsFromDigesters(digesters) | |
1904 else: | |
1905 local_hashes = _CreateDigestsFromLocalFile( | |
1906 logger, hash_algs, download_file_name, src_obj_metadata) | |
1907 | |
1908 digest_verified = True | |
1909 hash_invalid_exception = None | |
1910 try: | |
1911 _CheckHashes(logger, src_url, src_obj_metadata, download_file_name, | |
1912 local_hashes) | |
1913 DeleteTrackerFile(GetTrackerFilePath( | |
1914 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | |
1915 except HashMismatchException, e: | |
1916 # If an non-gzipped object gets sent with gzip content encoding, the hash | |
1917 # we calculate will match the gzipped bytes, not the original object. Thus, | |
1918 # we'll need to calculate and check it after unzipping. | |
1919 if server_gzip: | |
1920 logger.debug( | |
1921 'Hash did not match but server gzipped the content, will ' | |
1922 'recalculate.') | |
1923 digest_verified = False | |
1924 elif api_selector == ApiSelector.XML: | |
1925 logger.debug( | |
1926 'Hash did not match but server may have gzipped the content, will ' | |
1927 'recalculate.') | |
1928 # Save off the exception in case this isn't a gzipped file. | |
1929 hash_invalid_exception = e | |
1930 digest_verified = False | |
1931 else: | |
1932 DeleteTrackerFile(GetTrackerFilePath( | |
1933 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | |
1934 if _RENAME_ON_HASH_MISMATCH: | |
1935 os.rename(download_file_name, | |
1936 download_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) | |
1937 else: | |
1938 os.unlink(download_file_name) | |
1939 raise | |
1940 | |
1941 if server_gzip and not need_to_unzip: | |
1942 # Server compressed bytes on-the-fly, thus we need to rename and decompress. | |
1943 # We can't decompress on-the-fly because prior to Python 3.2 the gzip | |
1944 # module makes a bunch of seek calls on the stream. | |
1945 download_file_name = _GetDownloadZipFileName(file_name) | |
1946 os.rename(file_name, download_file_name) | |
1947 | |
1948 if need_to_unzip or server_gzip: | |
1949 # Log that we're uncompressing if the file is big enough that | |
1950 # decompressing would make it look like the transfer "stalled" at the end. | |
1951 if bytes_transferred > TEN_MIB: | |
1952 logger.info( | |
1953 'Uncompressing downloaded tmp file to %s...', file_name) | |
1954 | |
1955 # Downloaded gzipped file to a filename w/o .gz extension, so unzip. | |
1956 gzip_fp = None | |
1957 try: | |
1958 gzip_fp = gzip.open(download_file_name, 'rb') | |
1959 with open(file_name, 'wb') as f_out: | |
1960 data = gzip_fp.read(GZIP_CHUNK_SIZE) | |
1961 while data: | |
1962 f_out.write(data) | |
1963 data = gzip_fp.read(GZIP_CHUNK_SIZE) | |
1964 except IOError, e: | |
1965 # In the XML case where we don't know if the file was gzipped, raise | |
1966 # the original hash exception if we find that it wasn't. | |
1967 if 'Not a gzipped file' in str(e) and hash_invalid_exception: | |
1968 # Linter improperly thinks we're raising None despite the above check. | |
1969 # pylint: disable=raising-bad-type | |
1970 raise hash_invalid_exception | |
1971 finally: | |
1972 if gzip_fp: | |
1973 gzip_fp.close() | |
1974 | |
1975 os.unlink(download_file_name) | |
1976 | |
1977 if not digest_verified: | |
1978 try: | |
1979 # Recalculate hashes on the unzipped local file. | |
1980 local_hashes = _CreateDigestsFromLocalFile(logger, hash_algs, file_name, | |
1981 src_obj_metadata) | |
1982 _CheckHashes(logger, src_url, src_obj_metadata, file_name, local_hashes) | |
1983 DeleteTrackerFile(GetTrackerFilePath( | |
1984 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | |
1985 except HashMismatchException: | |
1986 DeleteTrackerFile(GetTrackerFilePath( | |
1987 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | |
1988 if _RENAME_ON_HASH_MISMATCH: | |
1989 os.rename(file_name, | |
1990 file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) | |
1991 else: | |
1992 os.unlink(file_name) | |
1993 raise | |
1994 | |
1995 if 'md5' in local_hashes: | |
1996 return local_hashes['md5'] | |
1997 | |
1998 | |
1999 def _CopyFileToFile(src_url, dst_url): | |
2000 """Copies a local file to a local file. | |
2001 | |
2002 Args: | |
2003 src_url: Source FileUrl. | |
2004 dst_url: Destination FileUrl. | |
2005 Returns: | |
2006 (elapsed_time, bytes_transferred, dst_url, md5=None). | |
2007 | |
2008 Raises: | |
2009 CommandException: if errors encountered. | |
2010 """ | |
2011 src_fp = GetStreamFromFileUrl(src_url) | |
2012 dir_name = os.path.dirname(dst_url.object_name) | |
2013 if dir_name and not os.path.exists(dir_name): | |
2014 os.makedirs(dir_name) | |
2015 dst_fp = open(dst_url.object_name, 'wb') | |
2016 start_time = time.time() | |
2017 shutil.copyfileobj(src_fp, dst_fp) | |
2018 end_time = time.time() | |
2019 return (end_time - start_time, os.path.getsize(dst_url.object_name), | |
2020 dst_url, None) | |
2021 | |
2022 | |
2023 def _DummyTrackerCallback(_): | |
2024 pass | |
2025 | |
2026 | |
2027 # pylint: disable=undefined-variable | |
2028 def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url, | |
2029 dst_obj_metadata, preconditions, gsutil_api, | |
2030 logger): | |
2031 """Copies from src_url to dst_url in "daisy chain" mode. | |
2032 | |
2033 See -D OPTION documentation about what daisy chain mode is. | |
2034 | |
2035 Args: | |
2036 src_url: Source CloudUrl | |
2037 src_obj_metadata: Metadata from source object | |
2038 dst_url: Destination CloudUrl | |
2039 dst_obj_metadata: Object-specific metadata that should be overidden during | |
2040 the copy. | |
2041 preconditions: Preconditions to use for the copy. | |
2042 gsutil_api: gsutil Cloud API to use for the copy. | |
2043 logger: For outputting log messages. | |
2044 | |
2045 Returns: | |
2046 (elapsed_time, bytes_transferred, dst_url with generation, | |
2047 md5 hash of destination) excluding overhead like initial GET. | |
2048 | |
2049 Raises: | |
2050 CommandException: if errors encountered. | |
2051 """ | |
2052 # We don't attempt to preserve ACLs across providers because | |
2053 # GCS and S3 support different ACLs and disjoint principals. | |
2054 if (global_copy_helper_opts.preserve_acl | |
2055 and src_url.scheme != dst_url.scheme): | |
2056 raise NotImplementedError( | |
2057 'Cross-provider cp -p not supported') | |
2058 if not global_copy_helper_opts.preserve_acl: | |
2059 dst_obj_metadata.acl = [] | |
2060 | |
2061 # Don't use callbacks for downloads on the daisy chain wrapper because | |
2062 # upload callbacks will output progress, but respect test hooks if present. | |
2063 progress_callback = None | |
2064 if global_copy_helper_opts.test_callback_file: | |
2065 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: | |
2066 progress_callback = pickle.loads(test_fp.read()).call | |
2067 | |
2068 start_time = time.time() | |
2069 upload_fp = DaisyChainWrapper(src_url, src_obj_metadata.size, gsutil_api, | |
2070 progress_callback=progress_callback) | |
2071 uploaded_object = None | |
2072 if src_obj_metadata.size == 0: | |
2073 # Resumable uploads of size 0 are not supported. | |
2074 uploaded_object = gsutil_api.UploadObject( | |
2075 upload_fp, object_metadata=dst_obj_metadata, | |
2076 canned_acl=global_copy_helper_opts.canned_acl, | |
2077 preconditions=preconditions, provider=dst_url.scheme, | |
2078 fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size) | |
2079 else: | |
2080 # TODO: Support process-break resumes. This will resume across connection | |
2081 # breaks and server errors, but the tracker callback is a no-op so this | |
2082 # won't resume across gsutil runs. | |
2083 # TODO: Test retries via test_callback_file. | |
2084 uploaded_object = gsutil_api.UploadObjectResumable( | |
2085 upload_fp, object_metadata=dst_obj_metadata, | |
2086 canned_acl=global_copy_helper_opts.canned_acl, | |
2087 preconditions=preconditions, provider=dst_url.scheme, | |
2088 fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size, | |
2089 progress_callback=FileProgressCallbackHandler( | |
2090 ConstructAnnounceText('Uploading', dst_url.url_string), | |
2091 logger).call, | |
2092 tracker_callback=_DummyTrackerCallback) | |
2093 end_time = time.time() | |
2094 | |
2095 try: | |
2096 _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, | |
2097 uploaded_object) | |
2098 except HashMismatchException: | |
2099 if _RENAME_ON_HASH_MISMATCH: | |
2100 corrupted_obj_metadata = apitools_messages.Object( | |
2101 name=dst_obj_metadata.name, | |
2102 bucket=dst_obj_metadata.bucket, | |
2103 etag=uploaded_object.etag) | |
2104 dst_obj_metadata.name = (dst_url.object_name + | |
2105 _RENAME_ON_HASH_MISMATCH_SUFFIX) | |
2106 gsutil_api.CopyObject(corrupted_obj_metadata, | |
2107 dst_obj_metadata, provider=dst_url.scheme) | |
2108 # If the digest doesn't match, delete the object. | |
2109 gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name, | |
2110 generation=uploaded_object.generation, | |
2111 provider=dst_url.scheme) | |
2112 raise | |
2113 | |
2114 result_url = dst_url.Clone() | |
2115 result_url.generation = GenerationFromUrlAndString( | |
2116 result_url, uploaded_object.generation) | |
2117 | |
2118 return (end_time - start_time, src_obj_metadata.size, result_url, | |
2119 uploaded_object.md5Hash) | |
2120 | |
2121 | |
2122 # pylint: disable=undefined-variable | |
2123 # pylint: disable=too-many-statements | |
2124 def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, | |
2125 copy_exception_handler, allow_splitting=True, | |
2126 headers=None, manifest=None, gzip_exts=None, test_method=None): | |
2127 """Performs copy from src_url to dst_url, handling various special cases. | |
2128 | |
2129 Args: | |
2130 logger: for outputting log messages. | |
2131 src_url: Source StorageUrl. | |
2132 dst_url: Destination StorageUrl. | |
2133 gsutil_api: gsutil Cloud API instance to use for the copy. | |
2134 command_obj: command object for use in Apply in parallel composite uploads. | |
2135 copy_exception_handler: for handling copy exceptions during Apply. | |
2136 allow_splitting: Whether to allow the file to be split into component | |
2137 pieces for an parallel composite upload. | |
2138 headers: optional headers to use for the copy operation. | |
2139 manifest: optional manifest for tracking copy operations. | |
2140 gzip_exts: List of file extensions to gzip for uploads, if any. | |
2141 test_method: optional test method for modifying files during unit tests. | |
2142 | |
2143 Returns: | |
2144 (elapsed_time, bytes_transferred, version-specific dst_url) excluding | |
2145 overhead like initial GET. | |
2146 | |
2147 Raises: | |
2148 ItemExistsError: if no clobber flag is specified and the destination | |
2149 object already exists. | |
2150 SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified | |
2151 and the source is an unsupported type. | |
2152 CommandException: if other errors encountered. | |
2153 """ | |
2154 if headers: | |
2155 dst_obj_headers = headers.copy() | |
2156 else: | |
2157 dst_obj_headers = {} | |
2158 | |
2159 # Create a metadata instance for each destination object so metadata | |
2160 # such as content-type can be applied per-object. | |
2161 # Initialize metadata from any headers passed in via -h. | |
2162 dst_obj_metadata = ObjectMetadataFromHeaders(dst_obj_headers) | |
2163 | |
2164 if dst_url.IsCloudUrl() and dst_url.scheme == 'gs': | |
2165 preconditions = PreconditionsFromHeaders(dst_obj_headers) | |
2166 else: | |
2167 preconditions = Preconditions() | |
2168 | |
2169 src_obj_metadata = None | |
2170 src_obj_filestream = None | |
2171 if src_url.IsCloudUrl(): | |
2172 src_obj_fields = None | |
2173 if dst_url.IsCloudUrl(): | |
2174 # For cloud or daisy chain copy, we need every copyable field. | |
2175 # If we're not modifying or overriding any of the fields, we can get | |
2176 # away without retrieving the object metadata because the copy | |
2177 # operation can succeed with just the destination bucket and object | |
2178 # name. But if we are sending any metadata, the JSON API will expect a | |
2179 # complete object resource. Since we want metadata like the object size | |
2180 # for our own tracking, we just get all of the metadata here. | |
2181 src_obj_fields = ['cacheControl', 'componentCount', | |
2182 'contentDisposition', 'contentEncoding', | |
2183 'contentLanguage', 'contentType', 'crc32c', | |
2184 'etag', 'generation', 'md5Hash', 'mediaLink', | |
2185 'metadata', 'metageneration', 'size'] | |
2186 # We only need the ACL if we're going to preserve it. | |
2187 if global_copy_helper_opts.preserve_acl: | |
2188 src_obj_fields.append('acl') | |
2189 if (src_url.scheme == dst_url.scheme | |
2190 and not global_copy_helper_opts.daisy_chain): | |
2191 copy_in_the_cloud = True | |
2192 else: | |
2193 copy_in_the_cloud = False | |
2194 else: | |
2195 # Just get the fields needed to validate the download. | |
2196 src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag', | |
2197 'mediaLink', 'md5Hash', 'size'] | |
2198 | |
2199 if (src_url.scheme == 's3' and | |
2200 global_copy_helper_opts.skip_unsupported_objects): | |
2201 src_obj_fields.append('storageClass') | |
2202 | |
2203 try: | |
2204 src_generation = GenerationFromUrlAndString(src_url, src_url.generation) | |
2205 src_obj_metadata = gsutil_api.GetObjectMetadata( | |
2206 src_url.bucket_name, src_url.object_name, | |
2207 generation=src_generation, provider=src_url.scheme, | |
2208 fields=src_obj_fields) | |
2209 except NotFoundException: | |
2210 raise CommandException( | |
2211 'NotFoundException: Could not retrieve source object %s.' % | |
2212 src_url.url_string) | |
2213 if (src_url.scheme == 's3' and | |
2214 global_copy_helper_opts.skip_unsupported_objects and | |
2215 src_obj_metadata.storageClass == 'GLACIER'): | |
2216 raise SkipGlacierError() | |
2217 | |
2218 src_obj_size = src_obj_metadata.size | |
2219 dst_obj_metadata.contentType = src_obj_metadata.contentType | |
2220 if global_copy_helper_opts.preserve_acl: | |
2221 dst_obj_metadata.acl = src_obj_metadata.acl | |
2222 # Special case for S3-to-S3 copy URLs using | |
2223 # global_copy_helper_opts.preserve_acl. | |
2224 # dst_url will be verified in _CopyObjToObjDaisyChainMode if it | |
2225 # is not s3 (and thus differs from src_url). | |
2226 if src_url.scheme == 's3': | |
2227 acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata) | |
2228 if acl_text: | |
2229 AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text) | |
2230 else: | |
2231 try: | |
2232 src_obj_filestream = GetStreamFromFileUrl(src_url) | |
2233 except Exception, e: # pylint: disable=broad-except | |
2234 raise CommandException('Error opening file "%s": %s.' % (src_url, | |
2235 e.message)) | |
2236 if src_url.IsStream(): | |
2237 src_obj_size = None | |
2238 else: | |
2239 src_obj_size = os.path.getsize(src_url.object_name) | |
2240 | |
2241 if global_copy_helper_opts.use_manifest: | |
2242 # Set the source size in the manifest. | |
2243 manifest.Set(src_url.url_string, 'size', src_obj_size) | |
2244 | |
2245 if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE | |
2246 and src_url != 's3'): | |
2247 raise CommandException( | |
2248 '"%s" exceeds the maximum gsutil-supported size for an S3 upload. S3 ' | |
2249 'objects greater than %s in size require multipart uploads, which ' | |
2250 'gsutil does not support.' % (src_url, | |
2251 MakeHumanReadable(S3_MAX_UPLOAD_SIZE))) | |
2252 | |
2253 # On Windows, stdin is opened as text mode instead of binary which causes | |
2254 # problems when piping a binary file, so this switches it to binary mode. | |
2255 if IS_WINDOWS and src_url.IsFileUrl() and src_url.IsStream(): | |
2256 msvcrt.setmode(GetStreamFromFileUrl(src_url).fileno(), os.O_BINARY) | |
2257 | |
2258 if global_copy_helper_opts.no_clobber: | |
2259 # There are two checks to prevent clobbering: | |
2260 # 1) The first check is to see if the URL | |
2261 # already exists at the destination and prevent the upload/download | |
2262 # from happening. This is done by the exists() call. | |
2263 # 2) The second check is only relevant if we are writing to gs. We can | |
2264 # enforce that the server only writes the object if it doesn't exist | |
2265 # by specifying the header below. This check only happens at the | |
2266 # server after the complete file has been uploaded. We specify this | |
2267 # header to prevent a race condition where a destination file may | |
2268 # be created after the first check and before the file is fully | |
2269 # uploaded. | |
2270 # In order to save on unnecessary uploads/downloads we perform both | |
2271 # checks. However, this may come at the cost of additional HTTP calls. | |
2272 if preconditions.gen_match: | |
2273 raise ArgumentException('Specifying x-goog-if-generation-match is ' | |
2274 'not supported with cp -n') | |
2275 else: | |
2276 preconditions.gen_match = 0 | |
2277 if dst_url.IsFileUrl() and os.path.exists(dst_url.object_name): | |
2278 # The local file may be a partial. Check the file sizes. | |
2279 if src_obj_size == os.path.getsize(dst_url.object_name): | |
2280 raise ItemExistsError() | |
2281 elif dst_url.IsCloudUrl(): | |
2282 try: | |
2283 dst_object = gsutil_api.GetObjectMetadata( | |
2284 dst_url.bucket_name, dst_url.object_name, provider=dst_url.scheme) | |
2285 except NotFoundException: | |
2286 dst_object = None | |
2287 if dst_object: | |
2288 raise ItemExistsError() | |
2289 | |
2290 if dst_url.IsCloudUrl(): | |
2291 # Cloud storage API gets object and bucket name from metadata. | |
2292 dst_obj_metadata.name = dst_url.object_name | |
2293 dst_obj_metadata.bucket = dst_url.bucket_name | |
2294 if src_url.IsCloudUrl(): | |
2295 # Preserve relevant metadata from the source object if it's not already | |
2296 # provided from the headers. | |
2297 CopyObjectMetadata(src_obj_metadata, dst_obj_metadata, override=False) | |
2298 src_obj_metadata.name = src_url.object_name | |
2299 src_obj_metadata.bucket = src_url.bucket_name | |
2300 else: | |
2301 _SetContentTypeFromFile(src_url, dst_obj_metadata) | |
2302 else: | |
2303 # Files don't have Cloud API metadata. | |
2304 dst_obj_metadata = None | |
2305 | |
2306 _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata) | |
2307 | |
2308 if src_url.IsCloudUrl(): | |
2309 if dst_url.IsFileUrl(): | |
2310 return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, | |
2311 gsutil_api, logger, test_method=test_method) | |
2312 elif copy_in_the_cloud: | |
2313 return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, | |
2314 dst_obj_metadata, preconditions, | |
2315 gsutil_api, logger) | |
2316 else: | |
2317 return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, | |
2318 dst_url, dst_obj_metadata, | |
2319 preconditions, gsutil_api, logger) | |
2320 else: # src_url.IsFileUrl() | |
2321 if dst_url.IsCloudUrl(): | |
2322 return _UploadFileToObject( | |
2323 src_url, src_obj_filestream, src_obj_size, dst_url, | |
2324 dst_obj_metadata, preconditions, gsutil_api, logger, command_obj, | |
2325 copy_exception_handler, gzip_exts=gzip_exts, | |
2326 allow_splitting=allow_splitting) | |
2327 else: # dst_url.IsFileUrl() | |
2328 return _CopyFileToFile(src_url, dst_url) | |
2329 | |
2330 | |
2331 class Manifest(object): | |
2332 """Stores the manifest items for the CpCommand class.""" | |
2333 | |
2334 def __init__(self, path): | |
2335 # self.items contains a dictionary of rows | |
2336 self.items = {} | |
2337 self.manifest_filter = {} | |
2338 self.lock = CreateLock() | |
2339 | |
2340 self.manifest_path = os.path.expanduser(path) | |
2341 self._ParseManifest() | |
2342 self._CreateManifestFile() | |
2343 | |
2344 def _ParseManifest(self): | |
2345 """Load and parse a manifest file. | |
2346 | |
2347 This information will be used to skip any files that have a skip or OK | |
2348 status. | |
2349 """ | |
2350 try: | |
2351 if os.path.exists(self.manifest_path): | |
2352 with open(self.manifest_path, 'rb') as f: | |
2353 first_row = True | |
2354 reader = csv.reader(f) | |
2355 for row in reader: | |
2356 if first_row: | |
2357 try: | |
2358 source_index = row.index('Source') | |
2359 result_index = row.index('Result') | |
2360 except ValueError: | |
2361 # No header and thus not a valid manifest file. | |
2362 raise CommandException( | |
2363 'Missing headers in manifest file: %s' % self.manifest_path) | |
2364 first_row = False | |
2365 source = row[source_index] | |
2366 result = row[result_index] | |
2367 if result in ['OK', 'skip']: | |
2368 # We're always guaranteed to take the last result of a specific | |
2369 # source url. | |
2370 self.manifest_filter[source] = result | |
2371 except IOError: | |
2372 raise CommandException('Could not parse %s' % self.manifest_path) | |
2373 | |
2374 def WasSuccessful(self, src): | |
2375 """Returns whether the specified src url was marked as successful.""" | |
2376 return src in self.manifest_filter | |
2377 | |
2378 def _CreateManifestFile(self): | |
2379 """Opens the manifest file and assigns it to the file pointer.""" | |
2380 try: | |
2381 if ((not os.path.exists(self.manifest_path)) | |
2382 or (os.stat(self.manifest_path).st_size == 0)): | |
2383 # Add headers to the new file. | |
2384 with open(self.manifest_path, 'wb', 1) as f: | |
2385 writer = csv.writer(f) | |
2386 writer.writerow(['Source', | |
2387 'Destination', | |
2388 'Start', | |
2389 'End', | |
2390 'Md5', | |
2391 'UploadId', | |
2392 'Source Size', | |
2393 'Bytes Transferred', | |
2394 'Result', | |
2395 'Description']) | |
2396 except IOError: | |
2397 raise CommandException('Could not create manifest file.') | |
2398 | |
2399 def Set(self, url, key, value): | |
2400 if value is None: | |
2401 # In case we don't have any information to set we bail out here. | |
2402 # This is so that we don't clobber existing information. | |
2403 # To zero information pass '' instead of None. | |
2404 return | |
2405 if url in self.items: | |
2406 self.items[url][key] = value | |
2407 else: | |
2408 self.items[url] = {key: value} | |
2409 | |
2410 def Initialize(self, source_url, destination_url): | |
2411 # Always use the source_url as the key for the item. This is unique. | |
2412 self.Set(source_url, 'source_uri', source_url) | |
2413 self.Set(source_url, 'destination_uri', destination_url) | |
2414 self.Set(source_url, 'start_time', datetime.datetime.utcnow()) | |
2415 | |
2416 def SetResult(self, source_url, bytes_transferred, result, | |
2417 description=''): | |
2418 self.Set(source_url, 'bytes', bytes_transferred) | |
2419 self.Set(source_url, 'result', result) | |
2420 self.Set(source_url, 'description', description) | |
2421 self.Set(source_url, 'end_time', datetime.datetime.utcnow()) | |
2422 self._WriteRowToManifestFile(source_url) | |
2423 self._RemoveItemFromManifest(source_url) | |
2424 | |
2425 def _WriteRowToManifestFile(self, url): | |
2426 """Writes a manifest entry to the manifest file for the url argument.""" | |
2427 row_item = self.items[url] | |
2428 data = [ | |
2429 str(row_item['source_uri'].encode(UTF8)), | |
2430 str(row_item['destination_uri'].encode(UTF8)), | |
2431 '%sZ' % row_item['start_time'].isoformat(), | |
2432 '%sZ' % row_item['end_time'].isoformat(), | |
2433 row_item['md5'] if 'md5' in row_item else '', | |
2434 row_item['upload_id'] if 'upload_id' in row_item else '', | |
2435 str(row_item['size']) if 'size' in row_item else '', | |
2436 str(row_item['bytes']) if 'bytes' in row_item else '', | |
2437 row_item['result'], | |
2438 row_item['description'].encode(UTF8)] | |
2439 | |
2440 # Aquire a lock to prevent multiple threads writing to the same file at | |
2441 # the same time. This would cause a garbled mess in the manifest file. | |
2442 with self.lock: | |
2443 with open(self.manifest_path, 'a', 1) as f: # 1 == line buffered | |
2444 writer = csv.writer(f) | |
2445 writer.writerow(data) | |
2446 | |
2447 def _RemoveItemFromManifest(self, url): | |
2448 # Remove the item from the dictionary since we're done with it and | |
2449 # we don't want the dictionary to grow too large in memory for no good | |
2450 # reason. | |
2451 del self.items[url] | |
2452 | |
2453 | |
2454 class ItemExistsError(Exception): | |
2455 """Exception class for objects that are skipped because they already exist.""" | |
2456 pass | |
2457 | |
2458 | |
2459 class SkipUnsupportedObjectError(Exception): | |
2460 """Exception for objects skipped because they are an unsupported type.""" | |
2461 | |
2462 def __init__(self): | |
2463 super(SkipUnsupportedObjectError, self).__init__() | |
2464 self.unsupported_type = 'Unknown' | |
2465 | |
2466 | |
2467 class SkipGlacierError(SkipUnsupportedObjectError): | |
2468 """Exception for objects skipped because they are an unsupported type.""" | |
2469 | |
2470 def __init__(self): | |
2471 super(SkipGlacierError, self).__init__() | |
2472 self.unsupported_type = 'GLACIER' | |
2473 | |
2474 | |
2475 def GetPathBeforeFinalDir(url): | |
2476 """Returns the path section before the final directory component of the URL. | |
2477 | |
2478 This handles cases for file system directories, bucket, and bucket | |
2479 subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket', | |
2480 and for file://dir we'll return file:// | |
2481 | |
2482 Args: | |
2483 url: StorageUrl representing a filesystem directory, cloud bucket or | |
2484 bucket subdir. | |
2485 | |
2486 Returns: | |
2487 String name of above-described path, sans final path separator. | |
2488 """ | |
2489 sep = url.delim | |
2490 if url.IsFileUrl(): | |
2491 past_scheme = url.url_string[len('file://'):] | |
2492 if past_scheme.find(sep) == -1: | |
2493 return 'file://' | |
2494 else: | |
2495 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] | |
2496 if url.IsBucket(): | |
2497 return '%s://' % url.scheme | |
2498 # Else it names a bucket subdir. | |
2499 return url.url_string.rstrip(sep).rpartition(sep)[0] | |
2500 | |
2501 | |
2502 def _DivideAndCeil(dividend, divisor): | |
2503 """Returns ceil(dividend / divisor). | |
2504 | |
2505 Takes care to avoid the pitfalls of floating point arithmetic that could | |
2506 otherwise yield the wrong result for large numbers. | |
2507 | |
2508 Args: | |
2509 dividend: Dividend for the operation. | |
2510 divisor: Divisor for the operation. | |
2511 | |
2512 Returns: | |
2513 Quotient. | |
2514 """ | |
2515 quotient = dividend // divisor | |
2516 if (dividend % divisor) != 0: | |
2517 quotient += 1 | |
2518 return quotient | |
2519 | |
2520 | |
2521 def _GetPartitionInfo(file_size, max_components, default_component_size): | |
2522 """Gets info about a file partition for parallel composite uploads. | |
2523 | |
2524 Args: | |
2525 file_size: The number of bytes in the file to be partitioned. | |
2526 max_components: The maximum number of components that can be composed. | |
2527 default_component_size: The size of a component, assuming that | |
2528 max_components is infinite. | |
2529 Returns: | |
2530 The number of components in the partitioned file, and the size of each | |
2531 component (except the last, which will have a different size iff | |
2532 file_size != 0 (mod num_components)). | |
2533 """ | |
2534 # num_components = ceil(file_size / default_component_size) | |
2535 num_components = _DivideAndCeil(file_size, default_component_size) | |
2536 | |
2537 # num_components must be in the range [2, max_components] | |
2538 num_components = max(min(num_components, max_components), 2) | |
2539 | |
2540 # component_size = ceil(file_size / num_components) | |
2541 component_size = _DivideAndCeil(file_size, num_components) | |
2542 return (num_components, component_size) | |
2543 | |
2544 | |
2545 def _DeleteObjectFn(cls, url_to_delete, thread_state=None): | |
2546 """Wrapper function to be used with command.Apply().""" | |
2547 gsutil_api = GetCloudApiInstance(cls, thread_state) | |
2548 gsutil_api.DeleteObject( | |
2549 url_to_delete.bucket_name, url_to_delete.object_name, | |
2550 generation=url_to_delete.generation, provider=url_to_delete.scheme) | |
2551 | |
2552 | |
2553 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock): | |
2554 """Parse the tracker file from the last parallel composite upload attempt. | |
2555 | |
2556 If it exists, the tracker file is of the format described in | |
2557 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be | |
2558 read, then the upload will start from the beginning. | |
2559 | |
2560 Args: | |
2561 tracker_file: The name of the file to parse. | |
2562 tracker_file_lock: Lock protecting access to the tracker file. | |
2563 | |
2564 Returns: | |
2565 random_prefix: A randomly-generated prefix to the name of the | |
2566 temporary components. | |
2567 existing_objects: A list of ObjectFromTracker objects representing | |
2568 the set of files that have already been uploaded. | |
2569 """ | |
2570 | |
2571 def GenerateRandomPrefix(): | |
2572 return str(random.randint(1, (10 ** 10) - 1)) | |
2573 | |
2574 existing_objects = [] | |
2575 try: | |
2576 with tracker_file_lock: | |
2577 with open(tracker_file, 'r') as fp: | |
2578 lines = fp.readlines() | |
2579 lines = [line.strip() for line in lines] | |
2580 if not lines: | |
2581 print('Parallel upload tracker file (%s) was invalid. ' | |
2582 'Restarting upload from scratch.' % tracker_file) | |
2583 lines = [GenerateRandomPrefix()] | |
2584 | |
2585 except IOError as e: | |
2586 # We can't read the tracker file, so generate a new random prefix. | |
2587 lines = [GenerateRandomPrefix()] | |
2588 | |
2589 # Ignore non-existent file (happens first time an upload | |
2590 # is attempted on a file), but warn user for other errors. | |
2591 if e.errno != errno.ENOENT: | |
2592 # Will restart because we failed to read in the file. | |
2593 print('Couldn\'t read parallel upload tracker file (%s): %s. ' | |
2594 'Restarting upload from scratch.' % (tracker_file, e.strerror)) | |
2595 | |
2596 # The first line contains the randomly-generated prefix. | |
2597 random_prefix = lines[0] | |
2598 | |
2599 # The remaining lines were written in pairs to describe a single component | |
2600 # in the form: | |
2601 # object_name (without random prefix) | |
2602 # generation | |
2603 # Newlines are used as the delimiter because only newlines and carriage | |
2604 # returns are invalid characters in object names, and users can specify | |
2605 # a custom prefix in the config file. | |
2606 i = 1 | |
2607 while i < len(lines): | |
2608 (name, generation) = (lines[i], lines[i+1]) | |
2609 if not generation: | |
2610 # Cover the '' case. | |
2611 generation = None | |
2612 existing_objects.append(ObjectFromTracker(name, generation)) | |
2613 i += 2 | |
2614 return (random_prefix, existing_objects) | |
2615 | |
2616 | |
2617 def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component, | |
2618 tracker_file_lock): | |
2619 """Appends info about the uploaded component to an existing tracker file. | |
2620 | |
2621 Follows the format described in _CreateParallelUploadTrackerFile. | |
2622 | |
2623 Args: | |
2624 tracker_file: Tracker file to append to. | |
2625 component: Component that was uploaded. | |
2626 tracker_file_lock: Thread and process-safe Lock for the tracker file. | |
2627 """ | |
2628 lines = _GetParallelUploadTrackerFileLinesForComponents([component]) | |
2629 lines = [line + '\n' for line in lines] | |
2630 with tracker_file_lock: | |
2631 with open(tracker_file, 'a') as f: | |
2632 f.writelines(lines) | |
2633 | |
2634 | |
2635 def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components, | |
2636 tracker_file_lock): | |
2637 """Writes information about components that were successfully uploaded. | |
2638 | |
2639 This way the upload can be resumed at a later date. The tracker file has | |
2640 the format: | |
2641 random_prefix | |
2642 temp_object_1_name | |
2643 temp_object_1_generation | |
2644 . | |
2645 . | |
2646 . | |
2647 temp_object_N_name | |
2648 temp_object_N_generation | |
2649 where N is the number of components that have been successfully uploaded. | |
2650 | |
2651 Args: | |
2652 tracker_file: The name of the parallel upload tracker file. | |
2653 random_prefix: The randomly-generated prefix that was used for | |
2654 for uploading any existing components. | |
2655 components: A list of ObjectFromTracker objects that were uploaded. | |
2656 tracker_file_lock: The lock protecting access to the tracker file. | |
2657 """ | |
2658 lines = [random_prefix] | |
2659 lines += _GetParallelUploadTrackerFileLinesForComponents(components) | |
2660 lines = [line + '\n' for line in lines] | |
2661 try: | |
2662 with tracker_file_lock: | |
2663 open(tracker_file, 'w').close() # Clear the file. | |
2664 with open(tracker_file, 'w') as f: | |
2665 f.writelines(lines) | |
2666 except IOError as e: | |
2667 RaiseUnwritableTrackerFileException(tracker_file, e.strerror) | |
2668 | |
2669 | |
2670 def _GetParallelUploadTrackerFileLinesForComponents(components): | |
2671 """Return a list of the lines for use in a parallel upload tracker file. | |
2672 | |
2673 The lines represent the given components, using the format as described in | |
2674 _CreateParallelUploadTrackerFile. | |
2675 | |
2676 Args: | |
2677 components: A list of ObjectFromTracker objects that were uploaded. | |
2678 | |
2679 Returns: | |
2680 Lines describing components with their generation for outputting to the | |
2681 tracker file. | |
2682 """ | |
2683 lines = [] | |
2684 for component in components: | |
2685 generation = None | |
2686 generation = component.generation | |
2687 if not generation: | |
2688 generation = '' | |
2689 lines += [component.object_name, str(generation)] | |
2690 return lines | |
2691 | |
2692 | |
2693 def FilterExistingComponents(dst_args, existing_components, bucket_url, | |
2694 gsutil_api): | |
2695 """Determines course of action for component objects. | |
2696 | |
2697 Given the list of all target objects based on partitioning the file and | |
2698 the list of objects that have already been uploaded successfully, | |
2699 this function determines which objects should be uploaded, which | |
2700 existing components are still valid, and which existing components should | |
2701 be deleted. | |
2702 | |
2703 Args: | |
2704 dst_args: The map of file_name -> PerformParallelUploadFileToObjectArgs | |
2705 calculated by partitioning the file. | |
2706 existing_components: A list of ObjectFromTracker objects that have been | |
2707 uploaded in the past. | |
2708 bucket_url: CloudUrl of the bucket in which the components exist. | |
2709 gsutil_api: gsutil Cloud API instance to use for retrieving object metadata. | |
2710 | |
2711 Returns: | |
2712 components_to_upload: List of components that need to be uploaded. | |
2713 uploaded_components: List of components that have already been | |
2714 uploaded and are still valid. | |
2715 existing_objects_to_delete: List of components that have already | |
2716 been uploaded, but are no longer valid | |
2717 and are in a versioned bucket, and | |
2718 therefore should be deleted. | |
2719 """ | |
2720 components_to_upload = [] | |
2721 existing_component_names = [component.object_name | |
2722 for component in existing_components] | |
2723 for component_name in dst_args: | |
2724 if component_name not in existing_component_names: | |
2725 components_to_upload.append(dst_args[component_name]) | |
2726 | |
2727 objects_already_chosen = [] | |
2728 | |
2729 # Don't reuse any temporary components whose MD5 doesn't match the current | |
2730 # MD5 of the corresponding part of the file. If the bucket is versioned, | |
2731 # also make sure that we delete the existing temporary version. | |
2732 existing_objects_to_delete = [] | |
2733 uploaded_components = [] | |
2734 for tracker_object in existing_components: | |
2735 if (tracker_object.object_name not in dst_args.keys() | |
2736 or tracker_object.object_name in objects_already_chosen): | |
2737 # This could happen if the component size has changed. This also serves | |
2738 # to handle object names that get duplicated in the tracker file due | |
2739 # to people doing things they shouldn't (e.g., overwriting an existing | |
2740 # temporary component in a versioned bucket). | |
2741 | |
2742 url = bucket_url.Clone() | |
2743 url.object_name = tracker_object.object_name | |
2744 url.generation = tracker_object.generation | |
2745 existing_objects_to_delete.append(url) | |
2746 continue | |
2747 | |
2748 dst_arg = dst_args[tracker_object.object_name] | |
2749 file_part = FilePart(dst_arg.filename, dst_arg.file_start, | |
2750 dst_arg.file_length) | |
2751 # TODO: calculate MD5's in parallel when possible. | |
2752 content_md5 = CalculateB64EncodedMd5FromContents(file_part) | |
2753 | |
2754 try: | |
2755 # Get the MD5 of the currently-existing component. | |
2756 dst_url = dst_arg.dst_url | |
2757 dst_metadata = gsutil_api.GetObjectMetadata( | |
2758 dst_url.bucket_name, dst_url.object_name, | |
2759 generation=dst_url.generation, provider=dst_url.scheme, | |
2760 fields=['md5Hash', 'etag']) | |
2761 cloud_md5 = dst_metadata.md5Hash | |
2762 except Exception: # pylint: disable=broad-except | |
2763 # We don't actually care what went wrong - we couldn't retrieve the | |
2764 # object to check the MD5, so just upload it again. | |
2765 cloud_md5 = None | |
2766 | |
2767 if cloud_md5 != content_md5: | |
2768 components_to_upload.append(dst_arg) | |
2769 objects_already_chosen.append(tracker_object.object_name) | |
2770 if tracker_object.generation: | |
2771 # If the old object doesn't have a generation (i.e., it isn't in a | |
2772 # versioned bucket), then we will just overwrite it anyway. | |
2773 invalid_component_with_generation = dst_arg.dst_url.Clone() | |
2774 invalid_component_with_generation.generation = tracker_object.generation | |
2775 existing_objects_to_delete.append(invalid_component_with_generation) | |
2776 else: | |
2777 url = dst_arg.dst_url.Clone() | |
2778 url.generation = tracker_object.generation | |
2779 uploaded_components.append(url) | |
2780 objects_already_chosen.append(tracker_object.object_name) | |
2781 | |
2782 if uploaded_components: | |
2783 logging.info('Found %d existing temporary components to reuse.', | |
2784 len(uploaded_components)) | |
2785 | |
2786 return (components_to_upload, uploaded_components, | |
2787 existing_objects_to_delete) | |
OLD | NEW |