Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(500)

Side by Side Diff: third_party/gsutil/gslib/copy_helper.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/gsutil/gslib/commands/web.py ('k') | third_party/gsutil/gslib/cred_types.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # -*- coding: utf-8 -*-
2 # Copyright 2011 Google Inc. All Rights Reserved.
3 # Copyright 2011, Nexenta Systems Inc.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 """Helper functions for copy functionality."""
17
18 from __future__ import absolute_import
19
20 import base64
21 from collections import namedtuple
22 import csv
23 import datetime
24 import errno
25 import gzip
26 from hashlib import md5
27 import json
28 import logging
29 import mimetypes
30 import multiprocessing
31 import os
32 import pickle
33 import random
34 import re
35 import shutil
36 import stat
37 import subprocess
38 import tempfile
39 import textwrap
40 import time
41 import traceback
42
43 from boto import config
44 import crcmod
45
46 import gslib
47 from gslib.cloud_api import ArgumentException
48 from gslib.cloud_api import CloudApi
49 from gslib.cloud_api import NotFoundException
50 from gslib.cloud_api import PreconditionException
51 from gslib.cloud_api import Preconditions
52 from gslib.cloud_api import ResumableDownloadException
53 from gslib.cloud_api import ResumableUploadAbortException
54 from gslib.cloud_api import ResumableUploadException
55 from gslib.cloud_api import ResumableUploadStartOverException
56 from gslib.cloud_api_helper import GetDownloadSerializationDict
57 from gslib.commands.compose import MAX_COMPOSE_ARITY
58 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SI ZE
59 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD
60 from gslib.cs_api_map import ApiSelector
61 from gslib.daisy_chain_wrapper import DaisyChainWrapper
62 from gslib.exception import CommandException
63 from gslib.exception import HashMismatchException
64 from gslib.file_part import FilePart
65 from gslib.hashing_helper import Base64EncodeHash
66 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
67 from gslib.hashing_helper import CalculateHashesFromContents
68 from gslib.hashing_helper import GetDownloadHashAlgs
69 from gslib.hashing_helper import GetUploadHashAlgs
70 from gslib.hashing_helper import HashingFileUploadWrapper
71 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict
72 from gslib.parallelism_framework_util import ThreadSafeDict
73 from gslib.progress_callback import ConstructAnnounceText
74 from gslib.progress_callback import FileProgressCallbackHandler
75 from gslib.progress_callback import ProgressCallbackWithBackoff
76 from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper
77 from gslib.storage_url import ContainsWildcard
78 from gslib.storage_url import StorageUrlFromString
79 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m essages
80 from gslib.tracker_file import DeleteTrackerFile
81 from gslib.tracker_file import GetTrackerFilePath
82 from gslib.tracker_file import RaiseUnwritableTrackerFileException
83 from gslib.tracker_file import ReadOrCreateDownloadTrackerFile
84 from gslib.tracker_file import TrackerFileType
85 from gslib.translation_helper import AddS3MarkerAclToObjectMetadata
86 from gslib.translation_helper import CopyObjectMetadata
87 from gslib.translation_helper import DEFAULT_CONTENT_TYPE
88 from gslib.translation_helper import GenerationFromUrlAndString
89 from gslib.translation_helper import ObjectMetadataFromHeaders
90 from gslib.translation_helper import PreconditionsFromHeaders
91 from gslib.translation_helper import S3MarkerAclFromObjectMetadata
92 from gslib.util import CreateLock
93 from gslib.util import DEFAULT_FILE_BUFFER_SIZE
94 from gslib.util import GetCloudApiInstance
95 from gslib.util import GetFileSize
96 from gslib.util import GetJsonResumableChunkSize
97 from gslib.util import GetMaxRetryDelay
98 from gslib.util import GetNumRetries
99 from gslib.util import GetStreamFromFileUrl
100 from gslib.util import HumanReadableToBytes
101 from gslib.util import IS_WINDOWS
102 from gslib.util import IsCloudSubdirPlaceholder
103 from gslib.util import MakeHumanReadable
104 from gslib.util import MIN_SIZE_COMPUTE_LOGGING
105 from gslib.util import MultiprocessingIsAvailable
106 from gslib.util import ResumableThreshold
107 from gslib.util import TEN_MIB
108 from gslib.util import UTF8
109 from gslib.wildcard_iterator import CreateWildcardIterator
110
111 # pylint: disable=g-import-not-at-top
112 if IS_WINDOWS:
113 import msvcrt
114 from ctypes import c_int
115 from ctypes import c_uint64
116 from ctypes import c_char_p
117 from ctypes import c_wchar_p
118 from ctypes import windll
119 from ctypes import POINTER
120 from ctypes import WINFUNCTYPE
121 from ctypes import WinError
122
123 # Declare copy_helper_opts as a global because namedtuple isn't aware of
124 # assigning to a class member (which breaks pickling done by multiprocessing).
125 # For details see
126 # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instanc e-correctly
127 # Similarly can't pickle logger.
128 # pylint: disable=global-at-module-level
129 global global_copy_helper_opts, global_logger
130
131 # In-memory map of local files that are currently opened for write. Used to
132 # ensure that if we write to the same file twice (say, for example, because the
133 # user specified two identical source URLs), the writes occur serially.
134 global open_files_map
135 open_files_map = (
136 ThreadSafeDict() if (IS_WINDOWS or not MultiprocessingIsAvailable()[0])
137 else ThreadAndProcessSafeDict(multiprocessing.Manager()))
138
139 # For debugging purposes; if True, files and objects that fail hash validation
140 # will be saved with the below suffix appended.
141 _RENAME_ON_HASH_MISMATCH = False
142 _RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt'
143
144 PARALLEL_UPLOAD_TEMP_NAMESPACE = (
145 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/')
146
147 PARALLEL_UPLOAD_STATIC_SALT = u"""
148 PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS.
149 The theory is that no user will have prepended this to the front of
150 one of their object names and then done an MD5 hash of the name, and
151 then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object
152 name. Note that there will be no problems with object name length since we
153 hash the original name.
154 """
155
156 # When uploading a file, get the following fields in the response for
157 # filling in command output and manifests.
158 UPLOAD_RETURN_FIELDS = ['crc32c', 'etag', 'generation', 'md5Hash', 'size']
159
160 # This tuple is used only to encapsulate the arguments needed for
161 # command.Apply() in the parallel composite upload case.
162 # Note that content_type is used instead of a full apitools Object() because
163 # apitools objects are not picklable.
164 # filename: String name of file.
165 # file_start: start byte of file (may be in the middle of a file for partitioned
166 # files).
167 # file_length: length of upload (may not be the entire length of a file for
168 # partitioned files).
169 # src_url: FileUrl describing the source file.
170 # dst_url: CloudUrl describing the destination component file.
171 # canned_acl: canned_acl to apply to the uploaded file/component.
172 # content_type: content-type for final object, used for setting content-type
173 # of components and final object.
174 # tracker_file: tracker file for this component.
175 # tracker_file_lock: tracker file lock for tracker file(s).
176 PerformParallelUploadFileToObjectArgs = namedtuple(
177 'PerformParallelUploadFileToObjectArgs',
178 'filename file_start file_length src_url dst_url canned_acl '
179 'content_type tracker_file tracker_file_lock')
180
181 ObjectFromTracker = namedtuple('ObjectFromTracker',
182 'object_name generation')
183
184 # TODO: Refactor this file to be less cumbersome. In particular, some of the
185 # different paths (e.g., uploading a file to an object vs. downloading an
186 # object to a file) could be split into separate files.
187
188 # Chunk size to use while zipping/unzipping gzip files.
189 GZIP_CHUNK_SIZE = 8192
190
191 PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024
192
193 # S3 requires special Multipart upload logic (that we currently don't implement)
194 # for files > 5GiB in size.
195 S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024
196
197 suggested_parallel_composites = False
198
199
200 class FileConcurrencySkipError(Exception):
201 """Raised when skipping a file due to a concurrent, duplicate copy."""
202
203
204 def _RmExceptionHandler(cls, e):
205 """Simple exception handler to allow post-completion status."""
206 cls.logger.error(str(e))
207
208
209 def _ParallelUploadCopyExceptionHandler(cls, e):
210 """Simple exception handler to allow post-completion status."""
211 cls.logger.error(str(e))
212 cls.op_failure_count += 1
213 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n',
214 traceback.format_exc())
215
216
217 def _PerformParallelUploadFileToObject(cls, args, thread_state=None):
218 """Function argument to Apply for performing parallel composite uploads.
219
220 Args:
221 cls: Calling Command class.
222 args: PerformParallelUploadFileToObjectArgs tuple describing the target.
223 thread_state: gsutil Cloud API instance to use for the operation.
224
225 Returns:
226 StorageUrl representing a successfully uploaded component.
227 """
228 fp = FilePart(args.filename, args.file_start, args.file_length)
229 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
230 with fp:
231 # We take many precautions with the component names that make collisions
232 # effectively impossible. Specifying preconditions will just allow us to
233 # reach a state in which uploads will always fail on retries.
234 preconditions = None
235
236 # Fill in content type if one was provided.
237 dst_object_metadata = apitools_messages.Object(
238 name=args.dst_url.object_name,
239 bucket=args.dst_url.bucket_name,
240 contentType=args.content_type)
241
242 try:
243 if global_copy_helper_opts.canned_acl:
244 # No canned ACL support in JSON, force XML API to be used for
245 # upload/copy operations.
246 orig_prefer_api = gsutil_api.prefer_api
247 gsutil_api.prefer_api = ApiSelector.XML
248 ret = _UploadFileToObject(args.src_url, fp, args.file_length,
249 args.dst_url, dst_object_metadata,
250 preconditions, gsutil_api, cls.logger, cls,
251 _ParallelUploadCopyExceptionHandler,
252 gzip_exts=None, allow_splitting=False)
253 finally:
254 if global_copy_helper_opts.canned_acl:
255 gsutil_api.prefer_api = orig_prefer_api
256
257 component = ret[2]
258 _AppendComponentTrackerToParallelUploadTrackerFile(
259 args.tracker_file, component, args.tracker_file_lock)
260 return ret
261
262
263 CopyHelperOpts = namedtuple('CopyHelperOpts', [
264 'perform_mv',
265 'no_clobber',
266 'daisy_chain',
267 'read_args_from_stdin',
268 'print_ver',
269 'use_manifest',
270 'preserve_acl',
271 'canned_acl',
272 'skip_unsupported_objects',
273 'test_callback_file'])
274
275
276 # pylint: disable=global-variable-undefined
277 def CreateCopyHelperOpts(perform_mv=False, no_clobber=False, daisy_chain=False,
278 read_args_from_stdin=False, print_ver=False,
279 use_manifest=False, preserve_acl=False,
280 canned_acl=None, skip_unsupported_objects=False,
281 test_callback_file=None):
282 """Creates CopyHelperOpts for passing options to CopyHelper."""
283 # We create a tuple with union of options needed by CopyHelper and any
284 # copy-related functionality in CpCommand, RsyncCommand, or Command class.
285 global global_copy_helper_opts
286 global_copy_helper_opts = CopyHelperOpts(
287 perform_mv=perform_mv,
288 no_clobber=no_clobber,
289 daisy_chain=daisy_chain,
290 read_args_from_stdin=read_args_from_stdin,
291 print_ver=print_ver,
292 use_manifest=use_manifest,
293 preserve_acl=preserve_acl,
294 canned_acl=canned_acl,
295 skip_unsupported_objects=skip_unsupported_objects,
296 test_callback_file=test_callback_file)
297 return global_copy_helper_opts
298
299
300 # pylint: disable=global-variable-undefined
301 # pylint: disable=global-variable-not-assigned
302 def GetCopyHelperOpts():
303 """Returns namedtuple holding CopyHelper options."""
304 global global_copy_helper_opts
305 return global_copy_helper_opts
306
307
308 def _SelectDownloadStrategy(dst_url):
309 """Get download strategy based on the destination object.
310
311 Args:
312 dst_url: Destination StorageUrl.
313
314 Returns:
315 gsutil Cloud API DownloadStrategy.
316 """
317 dst_is_special = False
318 if dst_url.IsFileUrl():
319 # Check explicitly first because os.stat doesn't work on 'nul' in Windows.
320 if dst_url.object_name == os.devnull:
321 dst_is_special = True
322 try:
323 mode = os.stat(dst_url.object_name).st_mode
324 if stat.S_ISCHR(mode):
325 dst_is_special = True
326 except OSError:
327 pass
328
329 if dst_is_special:
330 return CloudApi.DownloadStrategy.ONE_SHOT
331 else:
332 return CloudApi.DownloadStrategy.RESUMABLE
333
334
335 def _GetUploadTrackerData(tracker_file_name, logger):
336 """Reads tracker data from an upload tracker file if it exists.
337
338 Args:
339 tracker_file_name: Tracker file name for this upload.
340 logger: for outputting log messages.
341
342 Returns:
343 Serialization data if the tracker file already exists (resume existing
344 upload), None otherwise.
345 """
346 tracker_file = None
347
348 # If we already have a matching tracker file, get the serialization data
349 # so that we can resume the upload.
350 try:
351 tracker_file = open(tracker_file_name, 'r')
352 tracker_data = tracker_file.read()
353 return tracker_data
354 except IOError as e:
355 # Ignore non-existent file (happens first time a upload is attempted on an
356 # object, or when re-starting an upload after a
357 # ResumableUploadStartOverException), but warn user for other errors.
358 if e.errno != errno.ENOENT:
359 logger.warn('Couldn\'t read upload tracker file (%s): %s. Restarting '
360 'upload from scratch.', tracker_file_name, e.strerror)
361 finally:
362 if tracker_file:
363 tracker_file.close()
364
365
366 def InsistDstUrlNamesContainer(exp_dst_url, have_existing_dst_container,
367 command_name):
368 """Ensures the destination URL names a container.
369
370 Acceptable containers include directory, bucket, bucket
371 subdir, and non-existent bucket subdir.
372
373 Args:
374 exp_dst_url: Wildcard-expanded destination StorageUrl.
375 have_existing_dst_container: bool indicator of whether exp_dst_url
376 names a container (directory, bucket, or existing bucket subdir).
377 command_name: Name of command making call. May not be the same as the
378 calling class's self.command_name in the case of commands implemented
379 atop other commands (like mv command).
380
381 Raises:
382 CommandException: if the URL being checked does not name a container.
383 """
384 if ((exp_dst_url.IsFileUrl() and not exp_dst_url.IsDirectory()) or
385 (exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket()
386 and not have_existing_dst_container)):
387 raise CommandException('Destination URL must name a directory, bucket, '
388 'or bucket\nsubdirectory for the multiple '
389 'source form of the %s command.' % command_name)
390
391
392 def _ShouldTreatDstUrlAsBucketSubDir(have_multiple_srcs, dst_url,
393 have_existing_dest_subdir,
394 src_url_names_container,
395 recursion_requested):
396 """Checks whether dst_url should be treated as a bucket "sub-directory".
397
398 The decision about whether something constitutes a bucket "sub-directory"
399 depends on whether there are multiple sources in this request and whether
400 there is an existing bucket subdirectory. For example, when running the
401 command:
402 gsutil cp file gs://bucket/abc
403 if there's no existing gs://bucket/abc bucket subdirectory we should copy
404 file to the object gs://bucket/abc. In contrast, if
405 there's an existing gs://bucket/abc bucket subdirectory we should copy
406 file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc
407 exists, when running the command:
408 gsutil cp file1 file2 gs://bucket/abc
409 we should copy file1 to gs://bucket/abc/file1 (and similarly for file2).
410 Finally, for recursive copies, if the source is a container then we should
411 copy to a container as the target. For example, when running the command:
412 gsutil cp -r dir1 gs://bucket/dir2
413 we should copy the subtree of dir1 to gs://bucket/dir2.
414
415 Note that we don't disallow naming a bucket "sub-directory" where there's
416 already an object at that URL. For example it's legitimate (albeit
417 confusing) to have an object called gs://bucket/dir and
418 then run the command
419 gsutil cp file1 file2 gs://bucket/dir
420 Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1,
421 and gs://bucket/dir/file2.
422
423 Args:
424 have_multiple_srcs: Bool indicator of whether this is a multi-source
425 operation.
426 dst_url: StorageUrl to check.
427 have_existing_dest_subdir: bool indicator whether dest is an existing
428 subdirectory.
429 src_url_names_container: bool indicator of whether the source URL
430 is a container.
431 recursion_requested: True if a recursive operation has been requested.
432
433 Returns:
434 bool indicator.
435 """
436 if have_existing_dest_subdir:
437 return True
438 if dst_url.IsCloudUrl():
439 return (have_multiple_srcs or
440 (src_url_names_container and recursion_requested))
441
442
443 def _ShouldTreatDstUrlAsSingleton(have_multiple_srcs,
444 have_existing_dest_subdir, dst_url,
445 recursion_requested):
446 """Checks that dst_url names a single file/object after wildcard expansion.
447
448 It is possible that an object path might name a bucket sub-directory.
449
450 Args:
451 have_multiple_srcs: Bool indicator of whether this is a multi-source
452 operation.
453 have_existing_dest_subdir: bool indicator whether dest is an existing
454 subdirectory.
455 dst_url: StorageUrl to check.
456 recursion_requested: True if a recursive operation has been requested.
457
458 Returns:
459 bool indicator.
460 """
461 if recursion_requested:
462 return False
463 if dst_url.IsFileUrl():
464 return not dst_url.IsDirectory()
465 else: # dst_url.IsCloudUrl()
466 return (not have_multiple_srcs and
467 not have_existing_dest_subdir and
468 dst_url.IsObject())
469
470
471 def ConstructDstUrl(src_url, exp_src_url, src_url_names_container,
472 have_multiple_srcs, exp_dst_url, have_existing_dest_subdir,
473 recursion_requested):
474 """Constructs the destination URL for a given exp_src_url/exp_dst_url pair.
475
476 Uses context-dependent naming rules that mimic Linux cp and mv behavior.
477
478 Args:
479 src_url: Source StorageUrl to be copied.
480 exp_src_url: Single StorageUrl from wildcard expansion of src_url.
481 src_url_names_container: True if src_url names a container (including the
482 case of a wildcard-named bucket subdir (like gs://bucket/abc,
483 where gs://bucket/abc/* matched some objects).
484 have_multiple_srcs: True if this is a multi-source request. This can be
485 true if src_url wildcard-expanded to multiple URLs or if there were
486 multiple source URLs in the request.
487 exp_dst_url: the expanded StorageUrl requested for the cp destination.
488 Final written path is constructed from this plus a context-dependent
489 variant of src_url.
490 have_existing_dest_subdir: bool indicator whether dest is an existing
491 subdirectory.
492 recursion_requested: True if a recursive operation has been requested.
493
494 Returns:
495 StorageUrl to use for copy.
496
497 Raises:
498 CommandException if destination object name not specified for
499 source and source is a stream.
500 """
501 if _ShouldTreatDstUrlAsSingleton(
502 have_multiple_srcs, have_existing_dest_subdir, exp_dst_url,
503 recursion_requested):
504 # We're copying one file or object to one file or object.
505 return exp_dst_url
506
507 if exp_src_url.IsFileUrl() and exp_src_url.IsStream():
508 if have_existing_dest_subdir:
509 raise CommandException('Destination object name needed when '
510 'source is a stream')
511 return exp_dst_url
512
513 if not recursion_requested and not have_multiple_srcs:
514 # We're copying one file or object to a subdirectory. Append final comp
515 # of exp_src_url to exp_dst_url.
516 src_final_comp = exp_src_url.object_name.rpartition(src_url.delim)[-1]
517 return StorageUrlFromString('%s%s%s' % (
518 exp_dst_url.url_string.rstrip(exp_dst_url.delim),
519 exp_dst_url.delim, src_final_comp))
520
521 # Else we're copying multiple sources to a directory, bucket, or a bucket
522 # "sub-directory".
523
524 # Ensure exp_dst_url ends in delim char if we're doing a multi-src copy or
525 # a copy to a directory. (The check for copying to a directory needs
526 # special-case handling so that the command:
527 # gsutil cp gs://bucket/obj dir
528 # will turn into file://dir/ instead of file://dir -- the latter would cause
529 # the file "dirobj" to be created.)
530 # Note: need to check have_multiple_srcs or src_url.names_container()
531 # because src_url could be a bucket containing a single object, named
532 # as gs://bucket.
533 if ((have_multiple_srcs or src_url_names_container or
534 (exp_dst_url.IsFileUrl() and exp_dst_url.IsDirectory()))
535 and not exp_dst_url.url_string.endswith(exp_dst_url.delim)):
536 exp_dst_url = StorageUrlFromString('%s%s' % (exp_dst_url.url_string,
537 exp_dst_url.delim))
538
539 # Making naming behavior match how things work with local Linux cp and mv
540 # operations depends on many factors, including whether the destination is a
541 # container, the plurality of the source(s), and whether the mv command is
542 # being used:
543 # 1. For the "mv" command that specifies a non-existent destination subdir,
544 # renaming should occur at the level of the src subdir, vs appending that
545 # subdir beneath the dst subdir like is done for copying. For example:
546 # gsutil rm -r gs://bucket
547 # gsutil cp -r dir1 gs://bucket
548 # gsutil cp -r dir2 gs://bucket/subdir1
549 # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2
550 # would (if using cp naming behavior) end up with paths like:
551 # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops
552 # whereas mv naming behavior should result in:
553 # gs://bucket/subdir2/dir2/.svn/all-wcprops
554 # 2. Copying from directories, buckets, or bucket subdirs should result in
555 # objects/files mirroring the source directory hierarchy. For example:
556 # gsutil cp dir1/dir2 gs://bucket
557 # should create the object gs://bucket/dir2/file2, assuming dir1/dir2
558 # contains file2).
559 # To be consistent with Linux cp behavior, there's one more wrinkle when
560 # working with subdirs: The resulting object names depend on whether the
561 # destination subdirectory exists. For example, if gs://bucket/subdir
562 # exists, the command:
563 # gsutil cp -r dir1/dir2 gs://bucket/subdir
564 # should create objects named like gs://bucket/subdir/dir2/a/b/c. In
565 # contrast, if gs://bucket/subdir does not exist, this same command
566 # should create objects named like gs://bucket/subdir/a/b/c.
567 # 3. Copying individual files or objects to dirs, buckets or bucket subdirs
568 # should result in objects/files named by the final source file name
569 # component. Example:
570 # gsutil cp dir1/*.txt gs://bucket
571 # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt,
572 # assuming dir1 contains f1.txt and f2.txt.
573
574 recursive_move_to_new_subdir = False
575 if (global_copy_helper_opts.perform_mv and recursion_requested
576 and src_url_names_container and not have_existing_dest_subdir):
577 # Case 1. Handle naming rules for bucket subdir mv. Here we want to
578 # line up the src_url against its expansion, to find the base to build
579 # the new name. For example, running the command:
580 # gsutil mv gs://bucket/abcd gs://bucket/xyz
581 # when processing exp_src_url=gs://bucket/abcd/123
582 # exp_src_url_tail should become /123
583 # Note: mv.py code disallows wildcard specification of source URL.
584 recursive_move_to_new_subdir = True
585 exp_src_url_tail = (
586 exp_src_url.url_string[len(src_url.url_string):])
587 dst_key_name = '%s/%s' % (exp_dst_url.object_name.rstrip('/'),
588 exp_src_url_tail.strip('/'))
589
590 elif src_url_names_container and (exp_dst_url.IsCloudUrl() or
591 exp_dst_url.IsDirectory()):
592 # Case 2. Container copy to a destination other than a file.
593 # Build dst_key_name from subpath of exp_src_url past
594 # where src_url ends. For example, for src_url=gs://bucket/ and
595 # exp_src_url=gs://bucket/src_subdir/obj, dst_key_name should be
596 # src_subdir/obj.
597 src_url_path_sans_final_dir = GetPathBeforeFinalDir(src_url)
598 dst_key_name = exp_src_url.versionless_url_string[
599 len(src_url_path_sans_final_dir):].lstrip(src_url.delim)
600 # Handle case where dst_url is a non-existent subdir.
601 if not have_existing_dest_subdir:
602 dst_key_name = dst_key_name.partition(src_url.delim)[-1]
603 # Handle special case where src_url was a directory named with '.' or
604 # './', so that running a command like:
605 # gsutil cp -r . gs://dest
606 # will produce obj names of the form gs://dest/abc instead of
607 # gs://dest/./abc.
608 if dst_key_name.startswith('.%s' % os.sep):
609 dst_key_name = dst_key_name[2:]
610
611 else:
612 # Case 3.
613 dst_key_name = exp_src_url.object_name.rpartition(src_url.delim)[-1]
614
615 if (not recursive_move_to_new_subdir and (
616 exp_dst_url.IsFileUrl() or _ShouldTreatDstUrlAsBucketSubDir(
617 have_multiple_srcs, exp_dst_url, have_existing_dest_subdir,
618 src_url_names_container, recursion_requested))):
619 if exp_dst_url.object_name and exp_dst_url.object_name.endswith(
620 exp_dst_url.delim):
621 dst_key_name = '%s%s%s' % (
622 exp_dst_url.object_name.rstrip(exp_dst_url.delim),
623 exp_dst_url.delim, dst_key_name)
624 else:
625 delim = exp_dst_url.delim if exp_dst_url.object_name else ''
626 dst_key_name = '%s%s%s' % (exp_dst_url.object_name or '',
627 delim, dst_key_name)
628
629 new_exp_dst_url = exp_dst_url.Clone()
630 new_exp_dst_url.object_name = dst_key_name.replace(src_url.delim,
631 exp_dst_url.delim)
632 return new_exp_dst_url
633
634
635 def _CreateDigestsFromDigesters(digesters):
636 digests = {}
637 if digesters:
638 for alg in digesters:
639 digests[alg] = base64.encodestring(
640 digesters[alg].digest()).rstrip('\n')
641 return digests
642
643
644 def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata):
645 """Creates a base64 CRC32C and/or MD5 digest from file_name.
646
647 Args:
648 logger: for outputting log messages.
649 algs: list of algorithms to compute.
650 file_name: file to digest.
651 src_obj_metadata: metadta of source object.
652
653 Returns:
654 Dict of algorithm name : base 64 encoded digest
655 """
656 hash_dict = {}
657 if 'md5' in algs:
658 if src_obj_metadata.size and src_obj_metadata.size > TEN_MIB:
659 logger.info(
660 'Computing MD5 for %s...', file_name)
661 hash_dict['md5'] = md5()
662 if 'crc32c' in algs:
663 hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c')
664 with open(file_name, 'rb') as fp:
665 CalculateHashesFromContents(
666 fp, hash_dict, ProgressCallbackWithBackoff(
667 src_obj_metadata.size,
668 FileProgressCallbackHandler(
669 ConstructAnnounceText('Hashing', file_name), logger).call))
670 digests = {}
671 for alg_name, digest in hash_dict.iteritems():
672 digests[alg_name] = Base64EncodeHash(digest.hexdigest())
673 return digests
674
675
676 def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
677 dst_obj_metadata):
678 """Validates integrity of two cloud objects copied via daisy-chain.
679
680 Args:
681 logger: for outputting log messages.
682 src_url: CloudUrl for source cloud object.
683 dst_url: CloudUrl for destination cloud object.
684 src_obj_metadata: Cloud Object metadata for object being downloaded from.
685 dst_obj_metadata: Cloud Object metadata for object being uploaded to.
686
687 Raises:
688 CommandException: if cloud digests don't match local digests.
689 """
690 checked_one = False
691 download_hashes = {}
692 upload_hashes = {}
693 if src_obj_metadata.md5Hash:
694 download_hashes['md5'] = src_obj_metadata.md5Hash
695 if src_obj_metadata.crc32c:
696 download_hashes['crc32c'] = src_obj_metadata.crc32c
697 if dst_obj_metadata.md5Hash:
698 upload_hashes['md5'] = dst_obj_metadata.md5Hash
699 if dst_obj_metadata.crc32c:
700 upload_hashes['crc32c'] = dst_obj_metadata.crc32c
701
702 for alg, upload_b64_digest in upload_hashes.iteritems():
703 if alg not in download_hashes:
704 continue
705
706 download_b64_digest = download_hashes[alg]
707 logger.debug(
708 'Comparing source vs destination %s-checksum for %s. (%s/%s)', alg,
709 dst_url, download_b64_digest, upload_b64_digest)
710 if download_b64_digest != upload_b64_digest:
711 raise HashMismatchException(
712 '%s signature for source object (%s) doesn\'t match '
713 'destination object digest (%s). Object (%s) will be deleted.' % (
714 alg, download_b64_digest, upload_b64_digest, dst_url))
715 checked_one = True
716 if not checked_one:
717 # One known way this can currently happen is when downloading objects larger
718 # than 5 GiB from S3 (for which the etag is not an MD5).
719 logger.warn(
720 'WARNING: Found no hashes to validate object downloaded from %s and '
721 'uploaded to %s. Integrity cannot be assured without hashes.',
722 src_url, dst_url)
723
724
725 def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests,
726 is_upload=False):
727 """Validates integrity by comparing cloud digest to local digest.
728
729 Args:
730 logger: for outputting log messages.
731 obj_url: CloudUrl for cloud object.
732 obj_metadata: Cloud Object being downloaded from or uploaded to.
733 file_name: Local file name on disk being downloaded to or uploaded from.
734 digests: Computed Digests for the object.
735 is_upload: If true, comparing for an uploaded object (controls logging).
736
737 Raises:
738 CommandException: if cloud digests don't match local digests.
739 """
740 local_hashes = digests
741 cloud_hashes = {}
742 if obj_metadata.md5Hash:
743 cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n')
744 if obj_metadata.crc32c:
745 cloud_hashes['crc32c'] = obj_metadata.crc32c.rstrip('\n')
746
747 checked_one = False
748 for alg in local_hashes:
749 if alg not in cloud_hashes:
750 continue
751
752 local_b64_digest = local_hashes[alg]
753 cloud_b64_digest = cloud_hashes[alg]
754 logger.debug(
755 'Comparing local vs cloud %s-checksum for %s. (%s/%s)', alg, file_name,
756 local_b64_digest, cloud_b64_digest)
757 if local_b64_digest != cloud_b64_digest:
758
759 raise HashMismatchException(
760 '%s signature computed for local file (%s) doesn\'t match '
761 'cloud-supplied digest (%s). %s (%s) will be deleted.' % (
762 alg, local_b64_digest, cloud_b64_digest,
763 'Cloud object' if is_upload else 'Local file',
764 obj_url if is_upload else file_name))
765 checked_one = True
766 if not checked_one:
767 if is_upload:
768 logger.warn(
769 'WARNING: Found no hashes to validate object uploaded to %s. '
770 'Integrity cannot be assured without hashes.', obj_url)
771 else:
772 # One known way this can currently happen is when downloading objects larger
773 # than 5 GB from S3 (for which the etag is not an MD5).
774 logger.warn(
775 'WARNING: Found no hashes to validate object downloaded to %s. '
776 'Integrity cannot be assured without hashes.', file_name)
777
778
779 def IsNoClobberServerException(e):
780 """Checks to see if the server attempted to clobber a file.
781
782 In this case we specified via a precondition that we didn't want the file
783 clobbered.
784
785 Args:
786 e: The Exception that was generated by a failed copy operation
787
788 Returns:
789 bool indicator - True indicates that the server did attempt to clobber
790 an existing file.
791 """
792 return ((isinstance(e, PreconditionException)) or
793 (isinstance(e, ResumableUploadException) and '412' in e.message))
794
795
796 def CheckForDirFileConflict(exp_src_url, dst_url):
797 """Checks whether copying exp_src_url into dst_url is not possible.
798
799 This happens if a directory exists in local file system where a file
800 needs to go or vice versa. In that case we print an error message and
801 exits. Example: if the file "./x" exists and you try to do:
802 gsutil cp gs://mybucket/x/y .
803 the request can't succeed because it requires a directory where
804 the file x exists.
805
806 Note that we don't enforce any corresponding restrictions for buckets,
807 because the flat namespace semantics for buckets doesn't prohibit such
808 cases the way hierarchical file systems do. For example, if a bucket
809 contains an object called gs://bucket/dir and then you run the command:
810 gsutil cp file1 file2 gs://bucket/dir
811 you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and
812 gs://bucket/dir/file2.
813
814 Args:
815 exp_src_url: Expanded source StorageUrl.
816 dst_url: Destination StorageUrl.
817
818 Raises:
819 CommandException: if errors encountered.
820 """
821 if dst_url.IsCloudUrl():
822 # The problem can only happen for file destination URLs.
823 return
824 dst_path = dst_url.object_name
825 final_dir = os.path.dirname(dst_path)
826 if os.path.isfile(final_dir):
827 raise CommandException('Cannot retrieve %s because a file exists '
828 'where a directory needs to be created (%s).' %
829 (exp_src_url.url_string, final_dir))
830 if os.path.isdir(dst_path):
831 raise CommandException('Cannot retrieve %s because a directory exists '
832 '(%s) where the file needs to be created.' %
833 (exp_src_url.url_string, dst_path))
834
835
836 def _PartitionFile(fp, file_size, src_url, content_type, canned_acl,
837 dst_bucket_url, random_prefix, tracker_file,
838 tracker_file_lock):
839 """Partitions a file into FilePart objects to be uploaded and later composed.
840
841 These objects, when composed, will match the original file. This entails
842 splitting the file into parts, naming and forming a destination URL for each
843 part, and also providing the PerformParallelUploadFileToObjectArgs
844 corresponding to each part.
845
846 Args:
847 fp: The file object to be partitioned.
848 file_size: The size of fp, in bytes.
849 src_url: Source FileUrl from the original command.
850 content_type: content type for the component and final objects.
851 canned_acl: The user-provided canned_acl, if applicable.
852 dst_bucket_url: CloudUrl for the destination bucket
853 random_prefix: The randomly-generated prefix used to prevent collisions
854 among the temporary component names.
855 tracker_file: The path to the parallel composite upload tracker file.
856 tracker_file_lock: The lock protecting access to the tracker file.
857
858 Returns:
859 dst_args: The destination URIs for the temporary component objects.
860 """
861 parallel_composite_upload_component_size = HumanReadableToBytes(
862 config.get('GSUtil', 'parallel_composite_upload_component_size',
863 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE))
864 (num_components, component_size) = _GetPartitionInfo(
865 file_size, MAX_COMPOSE_ARITY, parallel_composite_upload_component_size)
866
867 dst_args = {} # Arguments to create commands and pass to subprocesses.
868 file_names = [] # Used for the 2-step process of forming dst_args.
869 for i in range(num_components):
870 # "Salt" the object name with something a user is very unlikely to have
871 # used in an object name, then hash the extended name to make sure
872 # we don't run into problems with name length. Using a deterministic
873 # naming scheme for the temporary components allows users to take
874 # advantage of resumable uploads for each component.
875 encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode(UTF8)
876 content_md5 = md5()
877 content_md5.update(encoded_name)
878 digest = content_md5.hexdigest()
879 temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE +
880 digest + '_' + str(i))
881 tmp_dst_url = dst_bucket_url.Clone()
882 tmp_dst_url.object_name = temp_file_name
883
884 if i < (num_components - 1):
885 # Every component except possibly the last is the same size.
886 file_part_length = component_size
887 else:
888 # The last component just gets all of the remaining bytes.
889 file_part_length = (file_size - ((num_components -1) * component_size))
890 offset = i * component_size
891 func_args = PerformParallelUploadFileToObjectArgs(
892 fp.name, offset, file_part_length, src_url, tmp_dst_url, canned_acl,
893 content_type, tracker_file, tracker_file_lock)
894 file_names.append(temp_file_name)
895 dst_args[temp_file_name] = func_args
896
897 return dst_args
898
899
900 def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata,
901 canned_acl, file_size, preconditions, gsutil_api,
902 command_obj, copy_exception_handler):
903 """Uploads a local file to a cloud object using parallel composite upload.
904
905 The file is partitioned into parts, and then the parts are uploaded in
906 parallel, composed to form the original destination object, and deleted.
907
908 Args:
909 fp: The file object to be uploaded.
910 src_url: FileUrl representing the local file.
911 dst_url: CloudUrl representing the destination file.
912 dst_obj_metadata: apitools Object describing the destination object.
913 canned_acl: The canned acl to apply to the object, if any.
914 file_size: The size of the source file in bytes.
915 preconditions: Cloud API Preconditions for the final object.
916 gsutil_api: gsutil Cloud API instance to use.
917 command_obj: Command object (for calling Apply).
918 copy_exception_handler: Copy exception handler (for use in Apply).
919
920 Returns:
921 Elapsed upload time, uploaded Object with generation, crc32c, and size
922 fields populated.
923 """
924 start_time = time.time()
925 dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string)
926 api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme)
927 # Determine which components, if any, have already been successfully
928 # uploaded.
929 tracker_file = GetTrackerFilePath(dst_url, TrackerFileType.PARALLEL_UPLOAD,
930 api_selector, src_url)
931 tracker_file_lock = CreateLock()
932 (random_prefix, existing_components) = (
933 _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock))
934
935 # Create the initial tracker file for the upload.
936 _CreateParallelUploadTrackerFile(tracker_file, random_prefix,
937 existing_components, tracker_file_lock)
938
939 # Get the set of all components that should be uploaded.
940 dst_args = _PartitionFile(
941 fp, file_size, src_url, dst_obj_metadata.contentType, canned_acl,
942 dst_bucket_url, random_prefix, tracker_file, tracker_file_lock)
943
944 (components_to_upload, existing_components, existing_objects_to_delete) = (
945 FilterExistingComponents(dst_args, existing_components, dst_bucket_url,
946 gsutil_api))
947
948 # In parallel, copy all of the file parts that haven't already been
949 # uploaded to temporary objects.
950 cp_results = command_obj.Apply(
951 _PerformParallelUploadFileToObject, components_to_upload,
952 copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'),
953 arg_checker=gslib.command.DummyArgChecker,
954 parallel_operations_override=True, should_return_results=True)
955 uploaded_components = []
956 for cp_result in cp_results:
957 uploaded_components.append(cp_result[2])
958 components = uploaded_components + existing_components
959
960 if len(components) == len(dst_args):
961 # Only try to compose if all of the components were uploaded successfully.
962
963 def _GetComponentNumber(component):
964 return int(component.object_name[component.object_name.rfind('_')+1:])
965 # Sort the components so that they will be composed in the correct order.
966 components = sorted(components, key=_GetComponentNumber)
967
968 request_components = []
969 for component_url in components:
970 src_obj_metadata = (
971 apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
972 name=component_url.object_name))
973 if component_url.HasGeneration():
974 src_obj_metadata.generation = long(component_url.generation)
975 request_components.append(src_obj_metadata)
976
977 composed_object = gsutil_api.ComposeObject(
978 request_components, dst_obj_metadata, preconditions=preconditions,
979 provider=dst_url.scheme, fields=['generation', 'crc32c', 'size'])
980
981 try:
982 # Make sure only to delete things that we know were successfully
983 # uploaded (as opposed to all of the objects that we attempted to
984 # create) so that we don't delete any preexisting objects, except for
985 # those that were uploaded by a previous, failed run and have since
986 # changed (but still have an old generation lying around).
987 objects_to_delete = components + existing_objects_to_delete
988 command_obj.Apply(_DeleteObjectFn, objects_to_delete, _RmExceptionHandler,
989 arg_checker=gslib.command.DummyArgChecker,
990 parallel_operations_override=True)
991 except Exception: # pylint: disable=broad-except
992 # If some of the delete calls fail, don't cause the whole command to
993 # fail. The copy was successful iff the compose call succeeded, so
994 # reduce this to a warning.
995 logging.warning(
996 'Failed to delete some of the following temporary objects:\n' +
997 '\n'.join(dst_args.keys()))
998 finally:
999 with tracker_file_lock:
1000 if os.path.exists(tracker_file):
1001 os.unlink(tracker_file)
1002 else:
1003 # Some of the components failed to upload. In this case, we want to exit
1004 # without deleting the objects.
1005 raise CommandException(
1006 'Some temporary components were not uploaded successfully. '
1007 'Please retry this upload.')
1008
1009 elapsed_time = time.time() - start_time
1010 return elapsed_time, composed_object
1011
1012
1013 def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url,
1014 file_size, canned_acl=None):
1015 """Determines whether parallel composite upload strategy should be used.
1016
1017 Args:
1018 logger: for outputting log messages.
1019 allow_splitting: If false, then this function returns false.
1020 src_url: FileUrl corresponding to a local file.
1021 dst_url: CloudUrl corresponding to destination cloud object.
1022 file_size: The size of the source file, in bytes.
1023 canned_acl: Canned ACL to apply to destination object, if any.
1024
1025 Returns:
1026 True iff a parallel upload should be performed on the source file.
1027 """
1028 global suggested_parallel_composites
1029 parallel_composite_upload_threshold = HumanReadableToBytes(config.get(
1030 'GSUtil', 'parallel_composite_upload_threshold',
1031 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD))
1032
1033 all_factors_but_size = (
1034 allow_splitting # Don't split the pieces multiple times.
1035 and not src_url.IsStream() # We can't partition streams.
1036 and dst_url.scheme == 'gs' # Compose is only for gs.
1037 and not canned_acl) # TODO: Implement canned ACL support for compose.
1038
1039 # Since parallel composite uploads are disabled by default, make user aware of
1040 # them.
1041 # TODO: Once compiled crcmod is being distributed by major Linux distributions
1042 # remove this check.
1043 if (all_factors_but_size and parallel_composite_upload_threshold == 0
1044 and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD
1045 and not suggested_parallel_composites):
1046 logger.info('\n'.join(textwrap.wrap(
1047 '==> NOTE: You are uploading one or more large file(s), which would '
1048 'run significantly faster if you enable parallel composite uploads. '
1049 'This feature can be enabled by editing the '
1050 '"parallel_composite_upload_threshold" value in your .boto '
1051 'configuration file. However, note that if you do this you and any '
1052 'users that download such composite files will need to have a compiled '
1053 'crcmod installed (see "gsutil help crcmod").')) + '\n')
1054 suggested_parallel_composites = True
1055
1056 return (all_factors_but_size
1057 and parallel_composite_upload_threshold > 0
1058 and file_size >= parallel_composite_upload_threshold)
1059
1060
1061 def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id,
1062 treat_nonexistent_object_as_subdir=False):
1063 """Expands wildcard if present in url_str.
1064
1065 Args:
1066 url_str: String representation of requested url.
1067 gsutil_api: gsutil Cloud API instance to use.
1068 debug: debug level to use (for iterators).
1069 project_id: project ID to use (for iterators).
1070 treat_nonexistent_object_as_subdir: indicates if should treat a non-existent
1071 object as a subdir.
1072
1073 Returns:
1074 (exp_url, have_existing_dst_container)
1075 where exp_url is a StorageUrl
1076 and have_existing_dst_container is a bool indicating whether
1077 exp_url names an existing directory, bucket, or bucket subdirectory.
1078 In the case where we match a subdirectory AND an object, the
1079 object is returned.
1080
1081 Raises:
1082 CommandException: if url_str matched more than 1 URL.
1083 """
1084 # Handle wildcarded url case.
1085 if ContainsWildcard(url_str):
1086 blr_expansion = list(CreateWildcardIterator(url_str, gsutil_api,
1087 debug=debug,
1088 project_id=project_id))
1089 if len(blr_expansion) != 1:
1090 raise CommandException('Destination (%s) must match exactly 1 URL' %
1091 url_str)
1092 blr = blr_expansion[0]
1093 # BLR is either an OBJECT, PREFIX, or BUCKET; the latter two represent
1094 # directories.
1095 return (StorageUrlFromString(blr.url_string), not blr.IsObject())
1096
1097 storage_url = StorageUrlFromString(url_str)
1098
1099 # Handle non-wildcarded URL.
1100 if storage_url.IsFileUrl():
1101 return (storage_url, storage_url.IsDirectory())
1102
1103 # At this point we have a cloud URL.
1104 if storage_url.IsBucket():
1105 return (storage_url, True)
1106
1107 # For object/prefix URLs check 3 cases: (a) if the name ends with '/' treat
1108 # as a subdir; otherwise, use the wildcard iterator with url to
1109 # find if (b) there's a Prefix matching url, or (c) name is of form
1110 # dir_$folder$ (and in both these cases also treat dir as a subdir).
1111 # Cloud subdirs are always considered to be an existing container.
1112 if IsCloudSubdirPlaceholder(storage_url):
1113 return (storage_url, True)
1114
1115 # Check for the special case where we have a folder marker object.
1116 folder_expansion = CreateWildcardIterator(
1117 storage_url.versionless_url_string + '_$folder$', gsutil_api,
1118 debug=debug, project_id=project_id).IterAll(
1119 bucket_listing_fields=['name'])
1120 for blr in folder_expansion:
1121 return (storage_url, True)
1122
1123 blr_expansion = CreateWildcardIterator(url_str, gsutil_api,
1124 debug=debug,
1125 project_id=project_id).IterAll(
1126 bucket_listing_fields=['name'])
1127 expansion_empty = True
1128 for blr in blr_expansion:
1129 expansion_empty = False
1130 if blr.IsPrefix():
1131 return (storage_url, True)
1132
1133 return (storage_url,
1134 expansion_empty and treat_nonexistent_object_as_subdir)
1135
1136
1137 def FixWindowsNaming(src_url, dst_url):
1138 """Translates Windows pathnames to cloud pathnames.
1139
1140 Rewrites the destination URL built by ConstructDstUrl().
1141
1142 Args:
1143 src_url: Source StorageUrl to be copied.
1144 dst_url: The destination StorageUrl built by ConstructDstUrl().
1145
1146 Returns:
1147 StorageUrl to use for copy.
1148 """
1149 if (src_url.IsFileUrl() and src_url.delim == '\\'
1150 and dst_url.IsCloudUrl()):
1151 trans_url_str = re.sub(r'\\', '/', dst_url.url_string)
1152 dst_url = StorageUrlFromString(trans_url_str)
1153 return dst_url
1154
1155
1156 def SrcDstSame(src_url, dst_url):
1157 """Checks if src_url and dst_url represent the same object or file.
1158
1159 We don't handle anything about hard or symbolic links.
1160
1161 Args:
1162 src_url: Source StorageUrl.
1163 dst_url: Destination StorageUrl.
1164
1165 Returns:
1166 Bool indicator.
1167 """
1168 if src_url.IsFileUrl() and dst_url.IsFileUrl():
1169 # Translate a/b/./c to a/b/c, so src=dst comparison below works.
1170 new_src_path = os.path.normpath(src_url.object_name)
1171 new_dst_path = os.path.normpath(dst_url.object_name)
1172 return new_src_path == new_dst_path
1173 else:
1174 return (src_url.url_string == dst_url.url_string and
1175 src_url.generation == dst_url.generation)
1176
1177
1178 def _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata):
1179 """Logs copy operation, including Content-Type if appropriate.
1180
1181 Args:
1182 logger: logger instance to use for output.
1183 src_url: Source StorageUrl.
1184 dst_url: Destination StorageUrl.
1185 dst_obj_metadata: Object-specific metadata that should be overidden during
1186 the copy.
1187 """
1188 if (dst_url.IsCloudUrl() and dst_obj_metadata and
1189 dst_obj_metadata.contentType):
1190 content_type_msg = ' [Content-Type=%s]' % dst_obj_metadata.contentType
1191 else:
1192 content_type_msg = ''
1193 if src_url.IsFileUrl() and src_url.IsStream():
1194 logger.info('Copying from <STDIN>%s...', content_type_msg)
1195 else:
1196 logger.info('Copying %s%s...', src_url.url_string, content_type_msg)
1197
1198
1199 # pylint: disable=undefined-variable
1200 def _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
1201 dst_obj_metadata, preconditions, gsutil_api,
1202 logger):
1203 """Performs copy-in-the cloud from specified src to dest object.
1204
1205 Args:
1206 src_url: Source CloudUrl.
1207 src_obj_metadata: Metadata for source object; must include etag and size.
1208 dst_url: Destination CloudUrl.
1209 dst_obj_metadata: Object-specific metadata that should be overidden during
1210 the copy.
1211 preconditions: Preconditions to use for the copy.
1212 gsutil_api: gsutil Cloud API instance to use for the copy.
1213 logger: logging.Logger for log message output.
1214
1215 Returns:
1216 (elapsed_time, bytes_transferred, dst_url with generation,
1217 md5 hash of destination) excluding overhead like initial GET.
1218
1219 Raises:
1220 CommandException: if errors encountered.
1221 """
1222 start_time = time.time()
1223
1224 progress_callback = FileProgressCallbackHandler(
1225 ConstructAnnounceText('Copying', dst_url.url_string), logger).call
1226 if global_copy_helper_opts.test_callback_file:
1227 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
1228 progress_callback = pickle.loads(test_fp.read()).call
1229 dst_obj = gsutil_api.CopyObject(
1230 src_obj_metadata, dst_obj_metadata, src_generation=src_url.generation,
1231 canned_acl=global_copy_helper_opts.canned_acl,
1232 preconditions=preconditions, progress_callback=progress_callback,
1233 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
1234
1235 end_time = time.time()
1236
1237 result_url = dst_url.Clone()
1238 result_url.generation = GenerationFromUrlAndString(result_url,
1239 dst_obj.generation)
1240
1241 return (end_time - start_time, src_obj_metadata.size, result_url,
1242 dst_obj.md5Hash)
1243
1244
1245 def _CheckFreeSpace(path):
1246 """Return path/drive free space (in bytes)."""
1247 if IS_WINDOWS:
1248 # pylint: disable=g-import-not-at-top
1249 try:
1250 # pylint: disable=invalid-name
1251 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p,
1252 POINTER(c_uint64),
1253 POINTER(c_uint64),
1254 POINTER(c_uint64))
1255 get_disk_free_space_ex = get_disk_free_space_ex(
1256 ('GetDiskFreeSpaceExW', windll.kernel32), (
1257 (1, 'lpszPathName'),
1258 (2, 'lpFreeUserSpace'),
1259 (2, 'lpTotalSpace'),
1260 (2, 'lpFreeSpace'),))
1261 except AttributeError:
1262 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p,
1263 POINTER(c_uint64),
1264 POINTER(c_uint64),
1265 POINTER(c_uint64))
1266 get_disk_free_space_ex = get_disk_free_space_ex(
1267 ('GetDiskFreeSpaceExA', windll.kernel32), (
1268 (1, 'lpszPathName'),
1269 (2, 'lpFreeUserSpace'),
1270 (2, 'lpTotalSpace'),
1271 (2, 'lpFreeSpace'),))
1272
1273 def GetDiskFreeSpaceExErrCheck(result, unused_func, args):
1274 if not result:
1275 raise WinError()
1276 return args[1].value
1277 get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck
1278
1279 return get_disk_free_space_ex(os.getenv('SystemDrive'))
1280 else:
1281 (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path)
1282 return f_frsize * f_bavail
1283
1284
1285 def _SetContentTypeFromFile(src_url, dst_obj_metadata):
1286 """Detects and sets Content-Type if src_url names a local file.
1287
1288 Args:
1289 src_url: Source StorageUrl.
1290 dst_obj_metadata: Object-specific metadata that should be overidden during
1291 the copy.
1292 """
1293 # contentType == '' if user requested default type.
1294 if (dst_obj_metadata.contentType is None and src_url.IsFileUrl()
1295 and not src_url.IsStream()):
1296 # Only do content type recognition if src_url is a file. Object-to-object
1297 # copies with no -h Content-Type specified re-use the content type of the
1298 # source object.
1299 object_name = src_url.object_name
1300 content_type = None
1301 # Streams (denoted by '-') are expected to be 'application/octet-stream'
1302 # and 'file' would partially consume them.
1303 if object_name != '-':
1304 if config.getbool('GSUtil', 'use_magicfile', False):
1305 p = subprocess.Popen(['file', '--mime-type', object_name],
1306 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1307 output, error = p.communicate()
1308 p.stdout.close()
1309 p.stderr.close()
1310 if p.returncode != 0 or error:
1311 raise CommandException(
1312 'Encountered error running "file --mime-type %s" '
1313 '(returncode=%d).\n%s' % (object_name, p.returncode, error))
1314 # Parse output by removing line delimiter and splitting on last ":
1315 content_type = output.rstrip().rpartition(': ')[2]
1316 else:
1317 content_type = mimetypes.guess_type(object_name)[0]
1318 if not content_type:
1319 content_type = DEFAULT_CONTENT_TYPE
1320 dst_obj_metadata.contentType = content_type
1321
1322
1323 # pylint: disable=undefined-variable
1324 def _UploadFileToObjectNonResumable(src_url, src_obj_filestream,
1325 src_obj_size, dst_url, dst_obj_metadata,
1326 preconditions, gsutil_api, logger):
1327 """Uploads the file using a non-resumable strategy.
1328
1329 Args:
1330 src_url: Source StorageUrl to upload.
1331 src_obj_filestream: File pointer to uploadable bytes.
1332 src_obj_size: Size of the source object.
1333 dst_url: Destination StorageUrl for the upload.
1334 dst_obj_metadata: Metadata for the target object.
1335 preconditions: Preconditions for the upload, if any.
1336 gsutil_api: gsutil Cloud API instance to use for the upload.
1337 logger: For outputting log messages.
1338
1339 Returns:
1340 Elapsed upload time, uploaded Object with generation, md5, and size fields
1341 populated.
1342 """
1343 progress_callback = FileProgressCallbackHandler(
1344 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
1345 if global_copy_helper_opts.test_callback_file:
1346 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
1347 progress_callback = pickle.loads(test_fp.read()).call
1348 start_time = time.time()
1349
1350 if src_url.IsStream():
1351 # TODO: gsutil-beta: Provide progress callbacks for streaming uploads.
1352 uploaded_object = gsutil_api.UploadObjectStreaming(
1353 src_obj_filestream, object_metadata=dst_obj_metadata,
1354 canned_acl=global_copy_helper_opts.canned_acl,
1355 preconditions=preconditions, progress_callback=progress_callback,
1356 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
1357 else:
1358 uploaded_object = gsutil_api.UploadObject(
1359 src_obj_filestream, object_metadata=dst_obj_metadata,
1360 canned_acl=global_copy_helper_opts.canned_acl, size=src_obj_size,
1361 preconditions=preconditions, progress_callback=progress_callback,
1362 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
1363 end_time = time.time()
1364 elapsed_time = end_time - start_time
1365
1366 return elapsed_time, uploaded_object
1367
1368
1369 # pylint: disable=undefined-variable
1370 def _UploadFileToObjectResumable(src_url, src_obj_filestream,
1371 src_obj_size, dst_url, dst_obj_metadata,
1372 preconditions, gsutil_api, logger):
1373 """Uploads the file using a resumable strategy.
1374
1375 Args:
1376 src_url: Source FileUrl to upload. Must not be a stream.
1377 src_obj_filestream: File pointer to uploadable bytes.
1378 src_obj_size: Size of the source object.
1379 dst_url: Destination StorageUrl for the upload.
1380 dst_obj_metadata: Metadata for the target object.
1381 preconditions: Preconditions for the upload, if any.
1382 gsutil_api: gsutil Cloud API instance to use for the upload.
1383 logger: for outputting log messages.
1384
1385 Returns:
1386 Elapsed upload time, uploaded Object with generation, md5, and size fields
1387 populated.
1388 """
1389 tracker_file_name = GetTrackerFilePath(
1390 dst_url, TrackerFileType.UPLOAD,
1391 gsutil_api.GetApiSelector(provider=dst_url.scheme))
1392
1393 def _UploadTrackerCallback(serialization_data):
1394 """Creates a new tracker file for starting an upload from scratch.
1395
1396 This function is called by the gsutil Cloud API implementation and the
1397 the serialization data is implementation-specific.
1398
1399 Args:
1400 serialization_data: Serialization data used in resuming the upload.
1401 """
1402 tracker_file = None
1403 try:
1404 tracker_file = open(tracker_file_name, 'w')
1405 tracker_file.write(str(serialization_data))
1406 except IOError as e:
1407 RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)
1408 finally:
1409 if tracker_file:
1410 tracker_file.close()
1411
1412 # This contains the upload URL, which will uniquely identify the
1413 # destination object.
1414 tracker_data = _GetUploadTrackerData(tracker_file_name, logger)
1415 if tracker_data:
1416 logger.info(
1417 'Resuming upload for %s', src_url.url_string)
1418
1419 retryable = True
1420
1421 progress_callback = FileProgressCallbackHandler(
1422 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
1423 if global_copy_helper_opts.test_callback_file:
1424 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
1425 progress_callback = pickle.loads(test_fp.read()).call
1426
1427 start_time = time.time()
1428 num_startover_attempts = 0
1429 # This loop causes us to retry when the resumable upload failed in a way that
1430 # requires starting over with a new upload ID. Retries within a single upload
1431 # ID within the current process are handled in
1432 # gsutil_api.UploadObjectResumable, and retries within a single upload ID
1433 # spanning processes happens if an exception occurs not caught below (which
1434 # will leave the tracker file in place, and cause the upload ID to be reused
1435 # the next time the user runs gsutil and attempts the same upload).
1436 while retryable:
1437 try:
1438 uploaded_object = gsutil_api.UploadObjectResumable(
1439 src_obj_filestream, object_metadata=dst_obj_metadata,
1440 canned_acl=global_copy_helper_opts.canned_acl,
1441 preconditions=preconditions, provider=dst_url.scheme,
1442 size=src_obj_size, serialization_data=tracker_data,
1443 fields=UPLOAD_RETURN_FIELDS,
1444 tracker_callback=_UploadTrackerCallback,
1445 progress_callback=progress_callback)
1446 retryable = False
1447 except ResumableUploadStartOverException, e:
1448 # This can happen, for example, if the server sends a 410 response code.
1449 # In that case the current resumable upload ID can't be reused, so delete
1450 # the tracker file and try again up to max retries.
1451 num_startover_attempts += 1
1452 retryable = (num_startover_attempts < GetNumRetries())
1453 if not retryable:
1454 raise
1455
1456 # If the server sends a 404 response code, then the upload should only
1457 # be restarted if it was the object (and not the bucket) that was missing.
1458 try:
1459 gsutil_api.GetBucket(dst_obj_metadata.bucket, provider=dst_url.scheme)
1460 except NotFoundException:
1461 raise
1462
1463 logger.info('Restarting upload from scratch after exception %s', e)
1464 DeleteTrackerFile(tracker_file_name)
1465 tracker_data = None
1466 src_obj_filestream.seek(0)
1467 # Reset the progress callback handler.
1468 progress_callback = FileProgressCallbackHandler(
1469 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
1470 logger.info('\n'.join(textwrap.wrap(
1471 'Resumable upload of %s failed with a response code indicating we '
1472 'need to start over with a new resumable upload ID. Backing off '
1473 'and retrying.' % src_url.url_string)))
1474 time.sleep(min(random.random() * (2 ** num_startover_attempts),
1475 GetMaxRetryDelay()))
1476 except ResumableUploadAbortException:
1477 retryable = False
1478 raise
1479 finally:
1480 if not retryable:
1481 DeleteTrackerFile(tracker_file_name)
1482
1483 end_time = time.time()
1484 elapsed_time = end_time - start_time
1485
1486 return (elapsed_time, uploaded_object)
1487
1488
1489 def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger):
1490 """Compresses a to-be-uploaded local file to save bandwidth.
1491
1492 Args:
1493 src_url: Source FileUrl.
1494 src_obj_filestream: Read stream of the source file - will be consumed
1495 and closed.
1496 src_obj_size: Size of the source file.
1497 logger: for outputting log messages.
1498
1499 Returns:
1500 StorageUrl path to compressed file, compressed file size.
1501 """
1502 # TODO: Compress using a streaming model as opposed to all at once here.
1503 if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING:
1504 logger.info(
1505 'Compressing %s (to tmp)...', src_url)
1506 (gzip_fh, gzip_path) = tempfile.mkstemp()
1507 gzip_fp = None
1508 try:
1509 # Check for temp space. Assume the compressed object is at most 2x
1510 # the size of the object (normally should compress to smaller than
1511 # the object)
1512 if _CheckFreeSpace(gzip_path) < 2*int(src_obj_size):
1513 raise CommandException('Inadequate temp space available to compress '
1514 '%s. See the CHANGING TEMP DIRECTORIES section '
1515 'of "gsutil help cp" for more info.' % src_url)
1516 gzip_fp = gzip.open(gzip_path, 'wb')
1517 data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
1518 while data:
1519 gzip_fp.write(data)
1520 data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
1521 finally:
1522 if gzip_fp:
1523 gzip_fp.close()
1524 os.close(gzip_fh)
1525 src_obj_filestream.close()
1526 gzip_size = os.path.getsize(gzip_path)
1527 return StorageUrlFromString(gzip_path), gzip_size
1528
1529
1530 def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size,
1531 dst_url, dst_obj_metadata, preconditions, gsutil_api,
1532 logger, command_obj, copy_exception_handler,
1533 gzip_exts=None, allow_splitting=True):
1534 """Uploads a local file to an object.
1535
1536 Args:
1537 src_url: Source FileUrl.
1538 src_obj_filestream: Read stream of the source file to be read and closed.
1539 src_obj_size: Size of the source file.
1540 dst_url: Destination CloudUrl.
1541 dst_obj_metadata: Metadata to be applied to the destination object.
1542 preconditions: Preconditions to use for the copy.
1543 gsutil_api: gsutil Cloud API to use for the copy.
1544 logger: for outputting log messages.
1545 command_obj: command object for use in Apply in parallel composite uploads.
1546 copy_exception_handler: For handling copy exceptions during Apply.
1547 gzip_exts: List of file extensions to gzip prior to upload, if any.
1548 allow_splitting: Whether to allow the file to be split into component
1549 pieces for an parallel composite upload.
1550
1551 Returns:
1552 (elapsed_time, bytes_transferred, dst_url with generation,
1553 md5 hash of destination) excluding overhead like initial GET.
1554
1555 Raises:
1556 CommandException: if errors encountered.
1557 """
1558 if not dst_obj_metadata or not dst_obj_metadata.contentLanguage:
1559 content_language = config.get_value('GSUtil', 'content_language')
1560 if content_language:
1561 dst_obj_metadata.contentLanguage = content_language
1562
1563 fname_parts = src_url.object_name.split('.')
1564 upload_url = src_url
1565 upload_stream = src_obj_filestream
1566 upload_size = src_obj_size
1567 zipped_file = False
1568 if gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts:
1569 upload_url, upload_size = _CompressFileForUpload(
1570 src_url, src_obj_filestream, src_obj_size, logger)
1571 upload_stream = open(upload_url.object_name, 'rb')
1572 dst_obj_metadata.contentEncoding = 'gzip'
1573 zipped_file = True
1574
1575 elapsed_time = None
1576 uploaded_object = None
1577 hash_algs = GetUploadHashAlgs()
1578 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
1579
1580 parallel_composite_upload = _ShouldDoParallelCompositeUpload(
1581 logger, allow_splitting, upload_url, dst_url, src_obj_size,
1582 canned_acl=global_copy_helper_opts.canned_acl)
1583
1584 if (src_url.IsStream() and
1585 gsutil_api.GetApiSelector(provider=dst_url.scheme) == ApiSelector.JSON):
1586 orig_stream = upload_stream
1587 # Add limited seekable properties to the stream via buffering.
1588 upload_stream = ResumableStreamingJsonUploadWrapper(
1589 orig_stream, GetJsonResumableChunkSize())
1590
1591 if not parallel_composite_upload and len(hash_algs):
1592 # Parallel composite uploads calculate hashes per-component in subsequent
1593 # calls to this function, but the composition of the final object is a
1594 # cloud-only operation.
1595 wrapped_filestream = HashingFileUploadWrapper(upload_stream, digesters,
1596 hash_algs, upload_url, logger)
1597 else:
1598 wrapped_filestream = upload_stream
1599
1600 try:
1601 if parallel_composite_upload:
1602 elapsed_time, uploaded_object = _DoParallelCompositeUpload(
1603 upload_stream, upload_url, dst_url, dst_obj_metadata,
1604 global_copy_helper_opts.canned_acl, upload_size, preconditions,
1605 gsutil_api, command_obj, copy_exception_handler)
1606 elif upload_size < ResumableThreshold() or src_url.IsStream():
1607 elapsed_time, uploaded_object = _UploadFileToObjectNonResumable(
1608 upload_url, wrapped_filestream, upload_size, dst_url,
1609 dst_obj_metadata, preconditions, gsutil_api, logger)
1610 else:
1611 elapsed_time, uploaded_object = _UploadFileToObjectResumable(
1612 upload_url, wrapped_filestream, upload_size, dst_url,
1613 dst_obj_metadata, preconditions, gsutil_api, logger)
1614
1615 finally:
1616 if zipped_file:
1617 try:
1618 os.unlink(upload_url.object_name)
1619 # Windows sometimes complains the temp file is locked when you try to
1620 # delete it.
1621 except Exception: # pylint: disable=broad-except
1622 logger.warning(
1623 'Could not delete %s. This can occur in Windows because the '
1624 'temporary file is still locked.', upload_url.object_name)
1625 # In the gzip case, this is the gzip stream. _CompressFileForUpload will
1626 # have already closed the original source stream.
1627 upload_stream.close()
1628
1629 if not parallel_composite_upload:
1630 try:
1631 digests = _CreateDigestsFromDigesters(digesters)
1632 _CheckHashes(logger, dst_url, uploaded_object, src_url.object_name,
1633 digests, is_upload=True)
1634 except HashMismatchException:
1635 if _RENAME_ON_HASH_MISMATCH:
1636 corrupted_obj_metadata = apitools_messages.Object(
1637 name=dst_obj_metadata.name,
1638 bucket=dst_obj_metadata.bucket,
1639 etag=uploaded_object.etag)
1640 dst_obj_metadata.name = (dst_url.object_name +
1641 _RENAME_ON_HASH_MISMATCH_SUFFIX)
1642 gsutil_api.CopyObject(corrupted_obj_metadata,
1643 dst_obj_metadata, provider=dst_url.scheme)
1644 # If the digest doesn't match, delete the object.
1645 gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name,
1646 generation=uploaded_object.generation,
1647 provider=dst_url.scheme)
1648 raise
1649
1650 result_url = dst_url.Clone()
1651
1652 result_url.generation = uploaded_object.generation
1653 result_url.generation = GenerationFromUrlAndString(
1654 result_url, uploaded_object.generation)
1655
1656 return (elapsed_time, uploaded_object.size, result_url,
1657 uploaded_object.md5Hash)
1658
1659
1660 # TODO: Refactor this long function into smaller pieces.
1661 # pylint: disable=too-many-statements
1662 def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
1663 gsutil_api, logger, test_method=None):
1664 """Downloads an object to a local file.
1665
1666 Args:
1667 src_url: Source CloudUrl.
1668 src_obj_metadata: Metadata from the source object.
1669 dst_url: Destination FileUrl.
1670 gsutil_api: gsutil Cloud API instance to use for the download.
1671 logger: for outputting log messages.
1672 test_method: Optional test method for modifying the file before validation
1673 during unit tests.
1674 Returns:
1675 (elapsed_time, bytes_transferred, dst_url, md5), excluding overhead like
1676 initial GET.
1677
1678 Raises:
1679 CommandException: if errors encountered.
1680 """
1681 global open_files_map
1682 file_name = dst_url.object_name
1683 dir_name = os.path.dirname(file_name)
1684 if dir_name and not os.path.exists(dir_name):
1685 # Do dir creation in try block so can ignore case where dir already
1686 # exists. This is needed to avoid a race condition when running gsutil
1687 # -m cp.
1688 try:
1689 os.makedirs(dir_name)
1690 except OSError, e:
1691 if e.errno != errno.EEXIST:
1692 raise
1693 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
1694 # For gzipped objects download to a temp file and unzip. For the XML API,
1695 # the represents the result of a HEAD request. For the JSON API, this is
1696 # the stored encoding which the service may not respect. However, if the
1697 # server sends decompressed bytes for a file that is stored compressed
1698 # (double compressed case), there is no way we can validate the hash and
1699 # we will fail our hash check for the object.
1700 if (src_obj_metadata.contentEncoding and
1701 src_obj_metadata.contentEncoding.lower().endswith('gzip')):
1702 # We can't use tempfile.mkstemp() here because we need a predictable
1703 # filename for resumable downloads.
1704 download_file_name = _GetDownloadZipFileName(file_name)
1705 logger.info(
1706 'Downloading to temp gzip filename %s', download_file_name)
1707 need_to_unzip = True
1708 else:
1709 download_file_name = file_name
1710 need_to_unzip = False
1711
1712 if download_file_name.endswith(dst_url.delim):
1713 logger.warn('\n'.join(textwrap.wrap(
1714 'Skipping attempt to download to filename ending with slash (%s). This '
1715 'typically happens when using gsutil to download from a subdirectory '
1716 'created by the Cloud Console (https://cloud.google.com/console)'
1717 % download_file_name)))
1718 return (0, 0, dst_url, '')
1719
1720 # Set up hash digesters.
1721 hash_algs = GetDownloadHashAlgs(
1722 logger, src_has_md5=src_obj_metadata.md5Hash,
1723 src_has_crc32c=src_obj_metadata.crc32c)
1724 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
1725
1726 fp = None
1727 # Tracks whether the server used a gzip encoding.
1728 server_encoding = None
1729 download_complete = False
1730 download_strategy = _SelectDownloadStrategy(dst_url)
1731 download_start_point = 0
1732 # This is used for resuming downloads, but also for passing the mediaLink
1733 # and size into the download for new downloads so that we can avoid
1734 # making an extra HTTP call.
1735 serialization_data = None
1736 serialization_dict = GetDownloadSerializationDict(src_obj_metadata)
1737 open_files = []
1738 try:
1739 if download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
1740 fp = open(download_file_name, 'wb')
1741 elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
1742 # If this is a resumable download, we need to open the file for append and
1743 # manage a tracker file.
1744 if open_files_map.get(download_file_name, False):
1745 # Ensure another process/thread is not already writing to this file.
1746 raise FileConcurrencySkipError
1747 open_files.append(download_file_name)
1748 open_files_map[download_file_name] = True
1749 fp = open(download_file_name, 'ab')
1750
1751 resuming = ReadOrCreateDownloadTrackerFile(
1752 src_obj_metadata, dst_url, api_selector)
1753 if resuming:
1754 # Find out how far along we are so we can request the appropriate
1755 # remaining range of the object.
1756 existing_file_size = GetFileSize(fp, position_to_eof=True)
1757 if existing_file_size > src_obj_metadata.size:
1758 DeleteTrackerFile(GetTrackerFilePath(
1759 dst_url, TrackerFileType.DOWNLOAD, api_selector))
1760 raise CommandException(
1761 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so '
1762 'if you re-try this download it will start from scratch' %
1763 (download_file_name, existing_file_size, src_url.object_name,
1764 src_obj_metadata.size))
1765 else:
1766 if existing_file_size == src_obj_metadata.size:
1767 logger.info(
1768 'Download already complete for file %s, skipping download but '
1769 'will run integrity checks.', download_file_name)
1770 download_complete = True
1771 else:
1772 download_start_point = existing_file_size
1773 serialization_dict['progress'] = download_start_point
1774 logger.info('Resuming download for %s', src_url.url_string)
1775 # Catch up our digester with the hash data.
1776 if existing_file_size > TEN_MIB:
1777 for alg_name in digesters:
1778 logger.info(
1779 'Catching up %s for %s', alg_name, download_file_name)
1780 with open(download_file_name, 'rb') as hash_fp:
1781 while True:
1782 data = hash_fp.read(DEFAULT_FILE_BUFFER_SIZE)
1783 if not data:
1784 break
1785 for alg_name in digesters:
1786 digesters[alg_name].update(data)
1787 else:
1788 # Starting a new download, blow away whatever is already there.
1789 fp.truncate(0)
1790 fp.seek(0)
1791
1792 else:
1793 raise CommandException('Invalid download strategy %s chosen for'
1794 'file %s' % (download_strategy, fp.name))
1795
1796 if not dst_url.IsStream():
1797 serialization_data = json.dumps(serialization_dict)
1798
1799 progress_callback = FileProgressCallbackHandler(
1800 ConstructAnnounceText('Downloading', dst_url.url_string),
1801 logger).call
1802 if global_copy_helper_opts.test_callback_file:
1803 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
1804 progress_callback = pickle.loads(test_fp.read()).call
1805
1806 start_time = time.time()
1807 # TODO: With gzip encoding (which may occur on-the-fly and not be part of
1808 # the object's metadata), when we request a range to resume, it's possible
1809 # that the server will just resend the entire object, which means our
1810 # caught-up hash will be incorrect. We recalculate the hash on
1811 # the local file in the case of a failed gzip hash anyway, but it would
1812 # be better if we actively detected this case.
1813 if not download_complete:
1814 server_encoding = gsutil_api.GetObjectMedia(
1815 src_url.bucket_name, src_url.object_name, fp,
1816 start_byte=download_start_point, generation=src_url.generation,
1817 object_size=src_obj_metadata.size,
1818 download_strategy=download_strategy, provider=src_url.scheme,
1819 serialization_data=serialization_data, digesters=digesters,
1820 progress_callback=progress_callback)
1821
1822 end_time = time.time()
1823
1824 # If a custom test method is defined, call it here. For the copy command,
1825 # test methods are expected to take one argument: an open file pointer,
1826 # and are used to perturb the open file during download to exercise
1827 # download error detection.
1828 if test_method:
1829 test_method(fp)
1830
1831 except ResumableDownloadException as e:
1832 logger.warning('Caught ResumableDownloadException (%s) for file %s.',
1833 e.reason, file_name)
1834 raise
1835
1836 finally:
1837 if fp:
1838 fp.close()
1839 for file_name in open_files:
1840 open_files_map.delete(file_name)
1841
1842 # If we decompressed a content-encoding gzip file on the fly, this may not
1843 # be accurate, but it is the best we can do without going deep into the
1844 # underlying HTTP libraries. Note that this value is only used for
1845 # reporting in log messages; inaccuracy doesn't impact the integrity of the
1846 # download.
1847 bytes_transferred = src_obj_metadata.size - download_start_point
1848 server_gzip = server_encoding and server_encoding.lower().endswith('gzip')
1849 local_md5 = _ValidateDownloadHashes(logger, src_url, src_obj_metadata,
1850 dst_url, need_to_unzip, server_gzip,
1851 digesters, hash_algs, api_selector,
1852 bytes_transferred)
1853
1854 return (end_time - start_time, bytes_transferred, dst_url, local_md5)
1855
1856
1857 def _GetDownloadZipFileName(file_name):
1858 """Returns the file name for a temporarily compressed downloaded file."""
1859 return '%s_.gztmp' % file_name
1860
1861
1862 def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
1863 need_to_unzip, server_gzip, digesters, hash_algs,
1864 api_selector, bytes_transferred):
1865 """Validates a downloaded file's integrity.
1866
1867 Args:
1868 logger: For outputting log messages.
1869 src_url: StorageUrl for the source object.
1870 src_obj_metadata: Metadata for the source object, potentially containing
1871 hash values.
1872 dst_url: StorageUrl describing the destination file.
1873 need_to_unzip: If true, a temporary zip file was used and must be
1874 uncompressed as part of validation.
1875 server_gzip: If true, the server gzipped the bytes (regardless of whether
1876 the object metadata claimed it was gzipped).
1877 digesters: dict of {string, hash digester} that contains up-to-date digests
1878 computed during the download. If a digester for a particular
1879 algorithm is None, an up-to-date digest is not available and the
1880 hash must be recomputed from the local file.
1881 hash_algs: dict of {string, hash algorithm} that can be used if digesters
1882 don't have up-to-date digests.
1883 api_selector: The Cloud API implementation used (used tracker file naming).
1884 bytes_transferred: Number of bytes downloaded (used for logging).
1885
1886 Returns:
1887 An MD5 of the local file, if one was calculated as part of the integrity
1888 check.
1889 """
1890 file_name = dst_url.object_name
1891 download_file_name = (_GetDownloadZipFileName(file_name) if need_to_unzip else
1892 file_name)
1893 digesters_succeeded = True
1894 for alg in digesters:
1895 # If we get a digester with a None algorithm, the underlying
1896 # implementation failed to calculate a digest, so we will need to
1897 # calculate one from scratch.
1898 if not digesters[alg]:
1899 digesters_succeeded = False
1900 break
1901
1902 if digesters_succeeded:
1903 local_hashes = _CreateDigestsFromDigesters(digesters)
1904 else:
1905 local_hashes = _CreateDigestsFromLocalFile(
1906 logger, hash_algs, download_file_name, src_obj_metadata)
1907
1908 digest_verified = True
1909 hash_invalid_exception = None
1910 try:
1911 _CheckHashes(logger, src_url, src_obj_metadata, download_file_name,
1912 local_hashes)
1913 DeleteTrackerFile(GetTrackerFilePath(
1914 dst_url, TrackerFileType.DOWNLOAD, api_selector))
1915 except HashMismatchException, e:
1916 # If an non-gzipped object gets sent with gzip content encoding, the hash
1917 # we calculate will match the gzipped bytes, not the original object. Thus,
1918 # we'll need to calculate and check it after unzipping.
1919 if server_gzip:
1920 logger.debug(
1921 'Hash did not match but server gzipped the content, will '
1922 'recalculate.')
1923 digest_verified = False
1924 elif api_selector == ApiSelector.XML:
1925 logger.debug(
1926 'Hash did not match but server may have gzipped the content, will '
1927 'recalculate.')
1928 # Save off the exception in case this isn't a gzipped file.
1929 hash_invalid_exception = e
1930 digest_verified = False
1931 else:
1932 DeleteTrackerFile(GetTrackerFilePath(
1933 dst_url, TrackerFileType.DOWNLOAD, api_selector))
1934 if _RENAME_ON_HASH_MISMATCH:
1935 os.rename(download_file_name,
1936 download_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
1937 else:
1938 os.unlink(download_file_name)
1939 raise
1940
1941 if server_gzip and not need_to_unzip:
1942 # Server compressed bytes on-the-fly, thus we need to rename and decompress.
1943 # We can't decompress on-the-fly because prior to Python 3.2 the gzip
1944 # module makes a bunch of seek calls on the stream.
1945 download_file_name = _GetDownloadZipFileName(file_name)
1946 os.rename(file_name, download_file_name)
1947
1948 if need_to_unzip or server_gzip:
1949 # Log that we're uncompressing if the file is big enough that
1950 # decompressing would make it look like the transfer "stalled" at the end.
1951 if bytes_transferred > TEN_MIB:
1952 logger.info(
1953 'Uncompressing downloaded tmp file to %s...', file_name)
1954
1955 # Downloaded gzipped file to a filename w/o .gz extension, so unzip.
1956 gzip_fp = None
1957 try:
1958 gzip_fp = gzip.open(download_file_name, 'rb')
1959 with open(file_name, 'wb') as f_out:
1960 data = gzip_fp.read(GZIP_CHUNK_SIZE)
1961 while data:
1962 f_out.write(data)
1963 data = gzip_fp.read(GZIP_CHUNK_SIZE)
1964 except IOError, e:
1965 # In the XML case where we don't know if the file was gzipped, raise
1966 # the original hash exception if we find that it wasn't.
1967 if 'Not a gzipped file' in str(e) and hash_invalid_exception:
1968 # Linter improperly thinks we're raising None despite the above check.
1969 # pylint: disable=raising-bad-type
1970 raise hash_invalid_exception
1971 finally:
1972 if gzip_fp:
1973 gzip_fp.close()
1974
1975 os.unlink(download_file_name)
1976
1977 if not digest_verified:
1978 try:
1979 # Recalculate hashes on the unzipped local file.
1980 local_hashes = _CreateDigestsFromLocalFile(logger, hash_algs, file_name,
1981 src_obj_metadata)
1982 _CheckHashes(logger, src_url, src_obj_metadata, file_name, local_hashes)
1983 DeleteTrackerFile(GetTrackerFilePath(
1984 dst_url, TrackerFileType.DOWNLOAD, api_selector))
1985 except HashMismatchException:
1986 DeleteTrackerFile(GetTrackerFilePath(
1987 dst_url, TrackerFileType.DOWNLOAD, api_selector))
1988 if _RENAME_ON_HASH_MISMATCH:
1989 os.rename(file_name,
1990 file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
1991 else:
1992 os.unlink(file_name)
1993 raise
1994
1995 if 'md5' in local_hashes:
1996 return local_hashes['md5']
1997
1998
1999 def _CopyFileToFile(src_url, dst_url):
2000 """Copies a local file to a local file.
2001
2002 Args:
2003 src_url: Source FileUrl.
2004 dst_url: Destination FileUrl.
2005 Returns:
2006 (elapsed_time, bytes_transferred, dst_url, md5=None).
2007
2008 Raises:
2009 CommandException: if errors encountered.
2010 """
2011 src_fp = GetStreamFromFileUrl(src_url)
2012 dir_name = os.path.dirname(dst_url.object_name)
2013 if dir_name and not os.path.exists(dir_name):
2014 os.makedirs(dir_name)
2015 dst_fp = open(dst_url.object_name, 'wb')
2016 start_time = time.time()
2017 shutil.copyfileobj(src_fp, dst_fp)
2018 end_time = time.time()
2019 return (end_time - start_time, os.path.getsize(dst_url.object_name),
2020 dst_url, None)
2021
2022
2023 def _DummyTrackerCallback(_):
2024 pass
2025
2026
2027 # pylint: disable=undefined-variable
2028 def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url,
2029 dst_obj_metadata, preconditions, gsutil_api,
2030 logger):
2031 """Copies from src_url to dst_url in "daisy chain" mode.
2032
2033 See -D OPTION documentation about what daisy chain mode is.
2034
2035 Args:
2036 src_url: Source CloudUrl
2037 src_obj_metadata: Metadata from source object
2038 dst_url: Destination CloudUrl
2039 dst_obj_metadata: Object-specific metadata that should be overidden during
2040 the copy.
2041 preconditions: Preconditions to use for the copy.
2042 gsutil_api: gsutil Cloud API to use for the copy.
2043 logger: For outputting log messages.
2044
2045 Returns:
2046 (elapsed_time, bytes_transferred, dst_url with generation,
2047 md5 hash of destination) excluding overhead like initial GET.
2048
2049 Raises:
2050 CommandException: if errors encountered.
2051 """
2052 # We don't attempt to preserve ACLs across providers because
2053 # GCS and S3 support different ACLs and disjoint principals.
2054 if (global_copy_helper_opts.preserve_acl
2055 and src_url.scheme != dst_url.scheme):
2056 raise NotImplementedError(
2057 'Cross-provider cp -p not supported')
2058 if not global_copy_helper_opts.preserve_acl:
2059 dst_obj_metadata.acl = []
2060
2061 # Don't use callbacks for downloads on the daisy chain wrapper because
2062 # upload callbacks will output progress, but respect test hooks if present.
2063 progress_callback = None
2064 if global_copy_helper_opts.test_callback_file:
2065 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
2066 progress_callback = pickle.loads(test_fp.read()).call
2067
2068 start_time = time.time()
2069 upload_fp = DaisyChainWrapper(src_url, src_obj_metadata.size, gsutil_api,
2070 progress_callback=progress_callback)
2071 uploaded_object = None
2072 if src_obj_metadata.size == 0:
2073 # Resumable uploads of size 0 are not supported.
2074 uploaded_object = gsutil_api.UploadObject(
2075 upload_fp, object_metadata=dst_obj_metadata,
2076 canned_acl=global_copy_helper_opts.canned_acl,
2077 preconditions=preconditions, provider=dst_url.scheme,
2078 fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size)
2079 else:
2080 # TODO: Support process-break resumes. This will resume across connection
2081 # breaks and server errors, but the tracker callback is a no-op so this
2082 # won't resume across gsutil runs.
2083 # TODO: Test retries via test_callback_file.
2084 uploaded_object = gsutil_api.UploadObjectResumable(
2085 upload_fp, object_metadata=dst_obj_metadata,
2086 canned_acl=global_copy_helper_opts.canned_acl,
2087 preconditions=preconditions, provider=dst_url.scheme,
2088 fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size,
2089 progress_callback=FileProgressCallbackHandler(
2090 ConstructAnnounceText('Uploading', dst_url.url_string),
2091 logger).call,
2092 tracker_callback=_DummyTrackerCallback)
2093 end_time = time.time()
2094
2095 try:
2096 _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
2097 uploaded_object)
2098 except HashMismatchException:
2099 if _RENAME_ON_HASH_MISMATCH:
2100 corrupted_obj_metadata = apitools_messages.Object(
2101 name=dst_obj_metadata.name,
2102 bucket=dst_obj_metadata.bucket,
2103 etag=uploaded_object.etag)
2104 dst_obj_metadata.name = (dst_url.object_name +
2105 _RENAME_ON_HASH_MISMATCH_SUFFIX)
2106 gsutil_api.CopyObject(corrupted_obj_metadata,
2107 dst_obj_metadata, provider=dst_url.scheme)
2108 # If the digest doesn't match, delete the object.
2109 gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name,
2110 generation=uploaded_object.generation,
2111 provider=dst_url.scheme)
2112 raise
2113
2114 result_url = dst_url.Clone()
2115 result_url.generation = GenerationFromUrlAndString(
2116 result_url, uploaded_object.generation)
2117
2118 return (end_time - start_time, src_obj_metadata.size, result_url,
2119 uploaded_object.md5Hash)
2120
2121
2122 # pylint: disable=undefined-variable
2123 # pylint: disable=too-many-statements
2124 def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
2125 copy_exception_handler, allow_splitting=True,
2126 headers=None, manifest=None, gzip_exts=None, test_method=None):
2127 """Performs copy from src_url to dst_url, handling various special cases.
2128
2129 Args:
2130 logger: for outputting log messages.
2131 src_url: Source StorageUrl.
2132 dst_url: Destination StorageUrl.
2133 gsutil_api: gsutil Cloud API instance to use for the copy.
2134 command_obj: command object for use in Apply in parallel composite uploads.
2135 copy_exception_handler: for handling copy exceptions during Apply.
2136 allow_splitting: Whether to allow the file to be split into component
2137 pieces for an parallel composite upload.
2138 headers: optional headers to use for the copy operation.
2139 manifest: optional manifest for tracking copy operations.
2140 gzip_exts: List of file extensions to gzip for uploads, if any.
2141 test_method: optional test method for modifying files during unit tests.
2142
2143 Returns:
2144 (elapsed_time, bytes_transferred, version-specific dst_url) excluding
2145 overhead like initial GET.
2146
2147 Raises:
2148 ItemExistsError: if no clobber flag is specified and the destination
2149 object already exists.
2150 SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified
2151 and the source is an unsupported type.
2152 CommandException: if other errors encountered.
2153 """
2154 if headers:
2155 dst_obj_headers = headers.copy()
2156 else:
2157 dst_obj_headers = {}
2158
2159 # Create a metadata instance for each destination object so metadata
2160 # such as content-type can be applied per-object.
2161 # Initialize metadata from any headers passed in via -h.
2162 dst_obj_metadata = ObjectMetadataFromHeaders(dst_obj_headers)
2163
2164 if dst_url.IsCloudUrl() and dst_url.scheme == 'gs':
2165 preconditions = PreconditionsFromHeaders(dst_obj_headers)
2166 else:
2167 preconditions = Preconditions()
2168
2169 src_obj_metadata = None
2170 src_obj_filestream = None
2171 if src_url.IsCloudUrl():
2172 src_obj_fields = None
2173 if dst_url.IsCloudUrl():
2174 # For cloud or daisy chain copy, we need every copyable field.
2175 # If we're not modifying or overriding any of the fields, we can get
2176 # away without retrieving the object metadata because the copy
2177 # operation can succeed with just the destination bucket and object
2178 # name. But if we are sending any metadata, the JSON API will expect a
2179 # complete object resource. Since we want metadata like the object size
2180 # for our own tracking, we just get all of the metadata here.
2181 src_obj_fields = ['cacheControl', 'componentCount',
2182 'contentDisposition', 'contentEncoding',
2183 'contentLanguage', 'contentType', 'crc32c',
2184 'etag', 'generation', 'md5Hash', 'mediaLink',
2185 'metadata', 'metageneration', 'size']
2186 # We only need the ACL if we're going to preserve it.
2187 if global_copy_helper_opts.preserve_acl:
2188 src_obj_fields.append('acl')
2189 if (src_url.scheme == dst_url.scheme
2190 and not global_copy_helper_opts.daisy_chain):
2191 copy_in_the_cloud = True
2192 else:
2193 copy_in_the_cloud = False
2194 else:
2195 # Just get the fields needed to validate the download.
2196 src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag',
2197 'mediaLink', 'md5Hash', 'size']
2198
2199 if (src_url.scheme == 's3' and
2200 global_copy_helper_opts.skip_unsupported_objects):
2201 src_obj_fields.append('storageClass')
2202
2203 try:
2204 src_generation = GenerationFromUrlAndString(src_url, src_url.generation)
2205 src_obj_metadata = gsutil_api.GetObjectMetadata(
2206 src_url.bucket_name, src_url.object_name,
2207 generation=src_generation, provider=src_url.scheme,
2208 fields=src_obj_fields)
2209 except NotFoundException:
2210 raise CommandException(
2211 'NotFoundException: Could not retrieve source object %s.' %
2212 src_url.url_string)
2213 if (src_url.scheme == 's3' and
2214 global_copy_helper_opts.skip_unsupported_objects and
2215 src_obj_metadata.storageClass == 'GLACIER'):
2216 raise SkipGlacierError()
2217
2218 src_obj_size = src_obj_metadata.size
2219 dst_obj_metadata.contentType = src_obj_metadata.contentType
2220 if global_copy_helper_opts.preserve_acl:
2221 dst_obj_metadata.acl = src_obj_metadata.acl
2222 # Special case for S3-to-S3 copy URLs using
2223 # global_copy_helper_opts.preserve_acl.
2224 # dst_url will be verified in _CopyObjToObjDaisyChainMode if it
2225 # is not s3 (and thus differs from src_url).
2226 if src_url.scheme == 's3':
2227 acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata)
2228 if acl_text:
2229 AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text)
2230 else:
2231 try:
2232 src_obj_filestream = GetStreamFromFileUrl(src_url)
2233 except Exception, e: # pylint: disable=broad-except
2234 raise CommandException('Error opening file "%s": %s.' % (src_url,
2235 e.message))
2236 if src_url.IsStream():
2237 src_obj_size = None
2238 else:
2239 src_obj_size = os.path.getsize(src_url.object_name)
2240
2241 if global_copy_helper_opts.use_manifest:
2242 # Set the source size in the manifest.
2243 manifest.Set(src_url.url_string, 'size', src_obj_size)
2244
2245 if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE
2246 and src_url != 's3'):
2247 raise CommandException(
2248 '"%s" exceeds the maximum gsutil-supported size for an S3 upload. S3 '
2249 'objects greater than %s in size require multipart uploads, which '
2250 'gsutil does not support.' % (src_url,
2251 MakeHumanReadable(S3_MAX_UPLOAD_SIZE)))
2252
2253 # On Windows, stdin is opened as text mode instead of binary which causes
2254 # problems when piping a binary file, so this switches it to binary mode.
2255 if IS_WINDOWS and src_url.IsFileUrl() and src_url.IsStream():
2256 msvcrt.setmode(GetStreamFromFileUrl(src_url).fileno(), os.O_BINARY)
2257
2258 if global_copy_helper_opts.no_clobber:
2259 # There are two checks to prevent clobbering:
2260 # 1) The first check is to see if the URL
2261 # already exists at the destination and prevent the upload/download
2262 # from happening. This is done by the exists() call.
2263 # 2) The second check is only relevant if we are writing to gs. We can
2264 # enforce that the server only writes the object if it doesn't exist
2265 # by specifying the header below. This check only happens at the
2266 # server after the complete file has been uploaded. We specify this
2267 # header to prevent a race condition where a destination file may
2268 # be created after the first check and before the file is fully
2269 # uploaded.
2270 # In order to save on unnecessary uploads/downloads we perform both
2271 # checks. However, this may come at the cost of additional HTTP calls.
2272 if preconditions.gen_match:
2273 raise ArgumentException('Specifying x-goog-if-generation-match is '
2274 'not supported with cp -n')
2275 else:
2276 preconditions.gen_match = 0
2277 if dst_url.IsFileUrl() and os.path.exists(dst_url.object_name):
2278 # The local file may be a partial. Check the file sizes.
2279 if src_obj_size == os.path.getsize(dst_url.object_name):
2280 raise ItemExistsError()
2281 elif dst_url.IsCloudUrl():
2282 try:
2283 dst_object = gsutil_api.GetObjectMetadata(
2284 dst_url.bucket_name, dst_url.object_name, provider=dst_url.scheme)
2285 except NotFoundException:
2286 dst_object = None
2287 if dst_object:
2288 raise ItemExistsError()
2289
2290 if dst_url.IsCloudUrl():
2291 # Cloud storage API gets object and bucket name from metadata.
2292 dst_obj_metadata.name = dst_url.object_name
2293 dst_obj_metadata.bucket = dst_url.bucket_name
2294 if src_url.IsCloudUrl():
2295 # Preserve relevant metadata from the source object if it's not already
2296 # provided from the headers.
2297 CopyObjectMetadata(src_obj_metadata, dst_obj_metadata, override=False)
2298 src_obj_metadata.name = src_url.object_name
2299 src_obj_metadata.bucket = src_url.bucket_name
2300 else:
2301 _SetContentTypeFromFile(src_url, dst_obj_metadata)
2302 else:
2303 # Files don't have Cloud API metadata.
2304 dst_obj_metadata = None
2305
2306 _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata)
2307
2308 if src_url.IsCloudUrl():
2309 if dst_url.IsFileUrl():
2310 return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
2311 gsutil_api, logger, test_method=test_method)
2312 elif copy_in_the_cloud:
2313 return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
2314 dst_obj_metadata, preconditions,
2315 gsutil_api, logger)
2316 else:
2317 return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata,
2318 dst_url, dst_obj_metadata,
2319 preconditions, gsutil_api, logger)
2320 else: # src_url.IsFileUrl()
2321 if dst_url.IsCloudUrl():
2322 return _UploadFileToObject(
2323 src_url, src_obj_filestream, src_obj_size, dst_url,
2324 dst_obj_metadata, preconditions, gsutil_api, logger, command_obj,
2325 copy_exception_handler, gzip_exts=gzip_exts,
2326 allow_splitting=allow_splitting)
2327 else: # dst_url.IsFileUrl()
2328 return _CopyFileToFile(src_url, dst_url)
2329
2330
2331 class Manifest(object):
2332 """Stores the manifest items for the CpCommand class."""
2333
2334 def __init__(self, path):
2335 # self.items contains a dictionary of rows
2336 self.items = {}
2337 self.manifest_filter = {}
2338 self.lock = CreateLock()
2339
2340 self.manifest_path = os.path.expanduser(path)
2341 self._ParseManifest()
2342 self._CreateManifestFile()
2343
2344 def _ParseManifest(self):
2345 """Load and parse a manifest file.
2346
2347 This information will be used to skip any files that have a skip or OK
2348 status.
2349 """
2350 try:
2351 if os.path.exists(self.manifest_path):
2352 with open(self.manifest_path, 'rb') as f:
2353 first_row = True
2354 reader = csv.reader(f)
2355 for row in reader:
2356 if first_row:
2357 try:
2358 source_index = row.index('Source')
2359 result_index = row.index('Result')
2360 except ValueError:
2361 # No header and thus not a valid manifest file.
2362 raise CommandException(
2363 'Missing headers in manifest file: %s' % self.manifest_path)
2364 first_row = False
2365 source = row[source_index]
2366 result = row[result_index]
2367 if result in ['OK', 'skip']:
2368 # We're always guaranteed to take the last result of a specific
2369 # source url.
2370 self.manifest_filter[source] = result
2371 except IOError:
2372 raise CommandException('Could not parse %s' % self.manifest_path)
2373
2374 def WasSuccessful(self, src):
2375 """Returns whether the specified src url was marked as successful."""
2376 return src in self.manifest_filter
2377
2378 def _CreateManifestFile(self):
2379 """Opens the manifest file and assigns it to the file pointer."""
2380 try:
2381 if ((not os.path.exists(self.manifest_path))
2382 or (os.stat(self.manifest_path).st_size == 0)):
2383 # Add headers to the new file.
2384 with open(self.manifest_path, 'wb', 1) as f:
2385 writer = csv.writer(f)
2386 writer.writerow(['Source',
2387 'Destination',
2388 'Start',
2389 'End',
2390 'Md5',
2391 'UploadId',
2392 'Source Size',
2393 'Bytes Transferred',
2394 'Result',
2395 'Description'])
2396 except IOError:
2397 raise CommandException('Could not create manifest file.')
2398
2399 def Set(self, url, key, value):
2400 if value is None:
2401 # In case we don't have any information to set we bail out here.
2402 # This is so that we don't clobber existing information.
2403 # To zero information pass '' instead of None.
2404 return
2405 if url in self.items:
2406 self.items[url][key] = value
2407 else:
2408 self.items[url] = {key: value}
2409
2410 def Initialize(self, source_url, destination_url):
2411 # Always use the source_url as the key for the item. This is unique.
2412 self.Set(source_url, 'source_uri', source_url)
2413 self.Set(source_url, 'destination_uri', destination_url)
2414 self.Set(source_url, 'start_time', datetime.datetime.utcnow())
2415
2416 def SetResult(self, source_url, bytes_transferred, result,
2417 description=''):
2418 self.Set(source_url, 'bytes', bytes_transferred)
2419 self.Set(source_url, 'result', result)
2420 self.Set(source_url, 'description', description)
2421 self.Set(source_url, 'end_time', datetime.datetime.utcnow())
2422 self._WriteRowToManifestFile(source_url)
2423 self._RemoveItemFromManifest(source_url)
2424
2425 def _WriteRowToManifestFile(self, url):
2426 """Writes a manifest entry to the manifest file for the url argument."""
2427 row_item = self.items[url]
2428 data = [
2429 str(row_item['source_uri'].encode(UTF8)),
2430 str(row_item['destination_uri'].encode(UTF8)),
2431 '%sZ' % row_item['start_time'].isoformat(),
2432 '%sZ' % row_item['end_time'].isoformat(),
2433 row_item['md5'] if 'md5' in row_item else '',
2434 row_item['upload_id'] if 'upload_id' in row_item else '',
2435 str(row_item['size']) if 'size' in row_item else '',
2436 str(row_item['bytes']) if 'bytes' in row_item else '',
2437 row_item['result'],
2438 row_item['description'].encode(UTF8)]
2439
2440 # Aquire a lock to prevent multiple threads writing to the same file at
2441 # the same time. This would cause a garbled mess in the manifest file.
2442 with self.lock:
2443 with open(self.manifest_path, 'a', 1) as f: # 1 == line buffered
2444 writer = csv.writer(f)
2445 writer.writerow(data)
2446
2447 def _RemoveItemFromManifest(self, url):
2448 # Remove the item from the dictionary since we're done with it and
2449 # we don't want the dictionary to grow too large in memory for no good
2450 # reason.
2451 del self.items[url]
2452
2453
2454 class ItemExistsError(Exception):
2455 """Exception class for objects that are skipped because they already exist."""
2456 pass
2457
2458
2459 class SkipUnsupportedObjectError(Exception):
2460 """Exception for objects skipped because they are an unsupported type."""
2461
2462 def __init__(self):
2463 super(SkipUnsupportedObjectError, self).__init__()
2464 self.unsupported_type = 'Unknown'
2465
2466
2467 class SkipGlacierError(SkipUnsupportedObjectError):
2468 """Exception for objects skipped because they are an unsupported type."""
2469
2470 def __init__(self):
2471 super(SkipGlacierError, self).__init__()
2472 self.unsupported_type = 'GLACIER'
2473
2474
2475 def GetPathBeforeFinalDir(url):
2476 """Returns the path section before the final directory component of the URL.
2477
2478 This handles cases for file system directories, bucket, and bucket
2479 subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket',
2480 and for file://dir we'll return file://
2481
2482 Args:
2483 url: StorageUrl representing a filesystem directory, cloud bucket or
2484 bucket subdir.
2485
2486 Returns:
2487 String name of above-described path, sans final path separator.
2488 """
2489 sep = url.delim
2490 if url.IsFileUrl():
2491 past_scheme = url.url_string[len('file://'):]
2492 if past_scheme.find(sep) == -1:
2493 return 'file://'
2494 else:
2495 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0]
2496 if url.IsBucket():
2497 return '%s://' % url.scheme
2498 # Else it names a bucket subdir.
2499 return url.url_string.rstrip(sep).rpartition(sep)[0]
2500
2501
2502 def _DivideAndCeil(dividend, divisor):
2503 """Returns ceil(dividend / divisor).
2504
2505 Takes care to avoid the pitfalls of floating point arithmetic that could
2506 otherwise yield the wrong result for large numbers.
2507
2508 Args:
2509 dividend: Dividend for the operation.
2510 divisor: Divisor for the operation.
2511
2512 Returns:
2513 Quotient.
2514 """
2515 quotient = dividend // divisor
2516 if (dividend % divisor) != 0:
2517 quotient += 1
2518 return quotient
2519
2520
2521 def _GetPartitionInfo(file_size, max_components, default_component_size):
2522 """Gets info about a file partition for parallel composite uploads.
2523
2524 Args:
2525 file_size: The number of bytes in the file to be partitioned.
2526 max_components: The maximum number of components that can be composed.
2527 default_component_size: The size of a component, assuming that
2528 max_components is infinite.
2529 Returns:
2530 The number of components in the partitioned file, and the size of each
2531 component (except the last, which will have a different size iff
2532 file_size != 0 (mod num_components)).
2533 """
2534 # num_components = ceil(file_size / default_component_size)
2535 num_components = _DivideAndCeil(file_size, default_component_size)
2536
2537 # num_components must be in the range [2, max_components]
2538 num_components = max(min(num_components, max_components), 2)
2539
2540 # component_size = ceil(file_size / num_components)
2541 component_size = _DivideAndCeil(file_size, num_components)
2542 return (num_components, component_size)
2543
2544
2545 def _DeleteObjectFn(cls, url_to_delete, thread_state=None):
2546 """Wrapper function to be used with command.Apply()."""
2547 gsutil_api = GetCloudApiInstance(cls, thread_state)
2548 gsutil_api.DeleteObject(
2549 url_to_delete.bucket_name, url_to_delete.object_name,
2550 generation=url_to_delete.generation, provider=url_to_delete.scheme)
2551
2552
2553 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock):
2554 """Parse the tracker file from the last parallel composite upload attempt.
2555
2556 If it exists, the tracker file is of the format described in
2557 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be
2558 read, then the upload will start from the beginning.
2559
2560 Args:
2561 tracker_file: The name of the file to parse.
2562 tracker_file_lock: Lock protecting access to the tracker file.
2563
2564 Returns:
2565 random_prefix: A randomly-generated prefix to the name of the
2566 temporary components.
2567 existing_objects: A list of ObjectFromTracker objects representing
2568 the set of files that have already been uploaded.
2569 """
2570
2571 def GenerateRandomPrefix():
2572 return str(random.randint(1, (10 ** 10) - 1))
2573
2574 existing_objects = []
2575 try:
2576 with tracker_file_lock:
2577 with open(tracker_file, 'r') as fp:
2578 lines = fp.readlines()
2579 lines = [line.strip() for line in lines]
2580 if not lines:
2581 print('Parallel upload tracker file (%s) was invalid. '
2582 'Restarting upload from scratch.' % tracker_file)
2583 lines = [GenerateRandomPrefix()]
2584
2585 except IOError as e:
2586 # We can't read the tracker file, so generate a new random prefix.
2587 lines = [GenerateRandomPrefix()]
2588
2589 # Ignore non-existent file (happens first time an upload
2590 # is attempted on a file), but warn user for other errors.
2591 if e.errno != errno.ENOENT:
2592 # Will restart because we failed to read in the file.
2593 print('Couldn\'t read parallel upload tracker file (%s): %s. '
2594 'Restarting upload from scratch.' % (tracker_file, e.strerror))
2595
2596 # The first line contains the randomly-generated prefix.
2597 random_prefix = lines[0]
2598
2599 # The remaining lines were written in pairs to describe a single component
2600 # in the form:
2601 # object_name (without random prefix)
2602 # generation
2603 # Newlines are used as the delimiter because only newlines and carriage
2604 # returns are invalid characters in object names, and users can specify
2605 # a custom prefix in the config file.
2606 i = 1
2607 while i < len(lines):
2608 (name, generation) = (lines[i], lines[i+1])
2609 if not generation:
2610 # Cover the '' case.
2611 generation = None
2612 existing_objects.append(ObjectFromTracker(name, generation))
2613 i += 2
2614 return (random_prefix, existing_objects)
2615
2616
2617 def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component,
2618 tracker_file_lock):
2619 """Appends info about the uploaded component to an existing tracker file.
2620
2621 Follows the format described in _CreateParallelUploadTrackerFile.
2622
2623 Args:
2624 tracker_file: Tracker file to append to.
2625 component: Component that was uploaded.
2626 tracker_file_lock: Thread and process-safe Lock for the tracker file.
2627 """
2628 lines = _GetParallelUploadTrackerFileLinesForComponents([component])
2629 lines = [line + '\n' for line in lines]
2630 with tracker_file_lock:
2631 with open(tracker_file, 'a') as f:
2632 f.writelines(lines)
2633
2634
2635 def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components,
2636 tracker_file_lock):
2637 """Writes information about components that were successfully uploaded.
2638
2639 This way the upload can be resumed at a later date. The tracker file has
2640 the format:
2641 random_prefix
2642 temp_object_1_name
2643 temp_object_1_generation
2644 .
2645 .
2646 .
2647 temp_object_N_name
2648 temp_object_N_generation
2649 where N is the number of components that have been successfully uploaded.
2650
2651 Args:
2652 tracker_file: The name of the parallel upload tracker file.
2653 random_prefix: The randomly-generated prefix that was used for
2654 for uploading any existing components.
2655 components: A list of ObjectFromTracker objects that were uploaded.
2656 tracker_file_lock: The lock protecting access to the tracker file.
2657 """
2658 lines = [random_prefix]
2659 lines += _GetParallelUploadTrackerFileLinesForComponents(components)
2660 lines = [line + '\n' for line in lines]
2661 try:
2662 with tracker_file_lock:
2663 open(tracker_file, 'w').close() # Clear the file.
2664 with open(tracker_file, 'w') as f:
2665 f.writelines(lines)
2666 except IOError as e:
2667 RaiseUnwritableTrackerFileException(tracker_file, e.strerror)
2668
2669
2670 def _GetParallelUploadTrackerFileLinesForComponents(components):
2671 """Return a list of the lines for use in a parallel upload tracker file.
2672
2673 The lines represent the given components, using the format as described in
2674 _CreateParallelUploadTrackerFile.
2675
2676 Args:
2677 components: A list of ObjectFromTracker objects that were uploaded.
2678
2679 Returns:
2680 Lines describing components with their generation for outputting to the
2681 tracker file.
2682 """
2683 lines = []
2684 for component in components:
2685 generation = None
2686 generation = component.generation
2687 if not generation:
2688 generation = ''
2689 lines += [component.object_name, str(generation)]
2690 return lines
2691
2692
2693 def FilterExistingComponents(dst_args, existing_components, bucket_url,
2694 gsutil_api):
2695 """Determines course of action for component objects.
2696
2697 Given the list of all target objects based on partitioning the file and
2698 the list of objects that have already been uploaded successfully,
2699 this function determines which objects should be uploaded, which
2700 existing components are still valid, and which existing components should
2701 be deleted.
2702
2703 Args:
2704 dst_args: The map of file_name -> PerformParallelUploadFileToObjectArgs
2705 calculated by partitioning the file.
2706 existing_components: A list of ObjectFromTracker objects that have been
2707 uploaded in the past.
2708 bucket_url: CloudUrl of the bucket in which the components exist.
2709 gsutil_api: gsutil Cloud API instance to use for retrieving object metadata.
2710
2711 Returns:
2712 components_to_upload: List of components that need to be uploaded.
2713 uploaded_components: List of components that have already been
2714 uploaded and are still valid.
2715 existing_objects_to_delete: List of components that have already
2716 been uploaded, but are no longer valid
2717 and are in a versioned bucket, and
2718 therefore should be deleted.
2719 """
2720 components_to_upload = []
2721 existing_component_names = [component.object_name
2722 for component in existing_components]
2723 for component_name in dst_args:
2724 if component_name not in existing_component_names:
2725 components_to_upload.append(dst_args[component_name])
2726
2727 objects_already_chosen = []
2728
2729 # Don't reuse any temporary components whose MD5 doesn't match the current
2730 # MD5 of the corresponding part of the file. If the bucket is versioned,
2731 # also make sure that we delete the existing temporary version.
2732 existing_objects_to_delete = []
2733 uploaded_components = []
2734 for tracker_object in existing_components:
2735 if (tracker_object.object_name not in dst_args.keys()
2736 or tracker_object.object_name in objects_already_chosen):
2737 # This could happen if the component size has changed. This also serves
2738 # to handle object names that get duplicated in the tracker file due
2739 # to people doing things they shouldn't (e.g., overwriting an existing
2740 # temporary component in a versioned bucket).
2741
2742 url = bucket_url.Clone()
2743 url.object_name = tracker_object.object_name
2744 url.generation = tracker_object.generation
2745 existing_objects_to_delete.append(url)
2746 continue
2747
2748 dst_arg = dst_args[tracker_object.object_name]
2749 file_part = FilePart(dst_arg.filename, dst_arg.file_start,
2750 dst_arg.file_length)
2751 # TODO: calculate MD5's in parallel when possible.
2752 content_md5 = CalculateB64EncodedMd5FromContents(file_part)
2753
2754 try:
2755 # Get the MD5 of the currently-existing component.
2756 dst_url = dst_arg.dst_url
2757 dst_metadata = gsutil_api.GetObjectMetadata(
2758 dst_url.bucket_name, dst_url.object_name,
2759 generation=dst_url.generation, provider=dst_url.scheme,
2760 fields=['md5Hash', 'etag'])
2761 cloud_md5 = dst_metadata.md5Hash
2762 except Exception: # pylint: disable=broad-except
2763 # We don't actually care what went wrong - we couldn't retrieve the
2764 # object to check the MD5, so just upload it again.
2765 cloud_md5 = None
2766
2767 if cloud_md5 != content_md5:
2768 components_to_upload.append(dst_arg)
2769 objects_already_chosen.append(tracker_object.object_name)
2770 if tracker_object.generation:
2771 # If the old object doesn't have a generation (i.e., it isn't in a
2772 # versioned bucket), then we will just overwrite it anyway.
2773 invalid_component_with_generation = dst_arg.dst_url.Clone()
2774 invalid_component_with_generation.generation = tracker_object.generation
2775 existing_objects_to_delete.append(invalid_component_with_generation)
2776 else:
2777 url = dst_arg.dst_url.Clone()
2778 url.generation = tracker_object.generation
2779 uploaded_components.append(url)
2780 objects_already_chosen.append(tracker_object.object_name)
2781
2782 if uploaded_components:
2783 logging.info('Found %d existing temporary components to reuse.',
2784 len(uploaded_components))
2785
2786 return (components_to_upload, uploaded_components,
2787 existing_objects_to_delete)
OLDNEW
« no previous file with comments | « third_party/gsutil/gslib/commands/web.py ('k') | third_party/gsutil/gslib/cred_types.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698