Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(40)

Side by Side Diff: third_party/gsutil/gslib/copy_helper.py

Issue 1380943003: Roll version of gsutil to 4.15. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: rebase Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « third_party/gsutil/gslib/commands/web.py ('k') | third_party/gsutil/gslib/gcs_json_api.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # -*- coding: utf-8 -*- 1 # -*- coding: utf-8 -*-
2 # Copyright 2011 Google Inc. All Rights Reserved. 2 # Copyright 2011 Google Inc. All Rights Reserved.
3 # Copyright 2011, Nexenta Systems Inc. 3 # Copyright 2011, Nexenta Systems Inc.
4 # 4 #
5 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License. 6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at 7 # You may obtain a copy of the License at
8 # 8 #
9 # http://www.apache.org/licenses/LICENSE-2.0 9 # http://www.apache.org/licenses/LICENSE-2.0
10 # 10 #
11 # Unless required by applicable law or agreed to in writing, software 11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and 14 # See the License for the specific language governing permissions and
15 # limitations under the License. 15 # limitations under the License.
16 """Helper functions for copy functionality.""" 16 """Helper functions for copy functionality."""
17 17
18 from __future__ import absolute_import 18 from __future__ import absolute_import
19 19
20 import base64 20 import base64
21 from collections import namedtuple 21 from collections import namedtuple
22 import csv 22 import csv
23 import datetime 23 import datetime
24 import errno 24 import errno
25 import gzip 25 import gzip
26 from hashlib import md5 26 from hashlib import md5
27 import json 27 import json
28 import logging 28 import logging
29 import mimetypes 29 import mimetypes
30 import multiprocessing 30 from operator import attrgetter
31 import os 31 import os
32 import pickle 32 import pickle
33 import random 33 import random
34 import re 34 import re
35 import shutil 35 import shutil
36 import stat 36 import stat
37 import subprocess 37 import subprocess
38 import tempfile 38 import tempfile
39 import textwrap 39 import textwrap
40 import time 40 import time
41 import traceback 41 import traceback
42 42
43 from boto import config 43 from boto import config
44 import crcmod 44 import crcmod
45 45
46 import gslib 46 import gslib
47 from gslib.cloud_api import ArgumentException 47 from gslib.cloud_api import ArgumentException
48 from gslib.cloud_api import CloudApi 48 from gslib.cloud_api import CloudApi
49 from gslib.cloud_api import NotFoundException 49 from gslib.cloud_api import NotFoundException
50 from gslib.cloud_api import PreconditionException 50 from gslib.cloud_api import PreconditionException
51 from gslib.cloud_api import Preconditions 51 from gslib.cloud_api import Preconditions
52 from gslib.cloud_api import ResumableDownloadException 52 from gslib.cloud_api import ResumableDownloadException
53 from gslib.cloud_api import ResumableUploadAbortException 53 from gslib.cloud_api import ResumableUploadAbortException
54 from gslib.cloud_api import ResumableUploadException 54 from gslib.cloud_api import ResumableUploadException
55 from gslib.cloud_api import ResumableUploadStartOverException 55 from gslib.cloud_api import ResumableUploadStartOverException
56 from gslib.cloud_api_helper import GetDownloadSerializationDict 56 from gslib.cloud_api_helper import GetDownloadSerializationData
57 from gslib.commands.compose import MAX_COMPOSE_ARITY 57 from gslib.commands.compose import MAX_COMPOSE_ARITY
58 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SI ZE 58 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SI ZE
59 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD 59 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD
60 from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE
61 from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS
62 from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD
60 from gslib.cs_api_map import ApiSelector 63 from gslib.cs_api_map import ApiSelector
61 from gslib.daisy_chain_wrapper import DaisyChainWrapper 64 from gslib.daisy_chain_wrapper import DaisyChainWrapper
62 from gslib.exception import CommandException 65 from gslib.exception import CommandException
63 from gslib.exception import HashMismatchException 66 from gslib.exception import HashMismatchException
64 from gslib.file_part import FilePart 67 from gslib.file_part import FilePart
65 from gslib.hashing_helper import Base64EncodeHash 68 from gslib.hashing_helper import Base64EncodeHash
66 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents 69 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
67 from gslib.hashing_helper import CalculateHashesFromContents 70 from gslib.hashing_helper import CalculateHashesFromContents
71 from gslib.hashing_helper import CHECK_HASH_IF_FAST_ELSE_FAIL
72 from gslib.hashing_helper import CHECK_HASH_NEVER
73 from gslib.hashing_helper import ConcatCrc32c
68 from gslib.hashing_helper import GetDownloadHashAlgs 74 from gslib.hashing_helper import GetDownloadHashAlgs
69 from gslib.hashing_helper import GetUploadHashAlgs 75 from gslib.hashing_helper import GetUploadHashAlgs
70 from gslib.hashing_helper import HashingFileUploadWrapper 76 from gslib.hashing_helper import HashingFileUploadWrapper
71 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict 77 from gslib.parallelism_framework_util import AtomicDict
72 from gslib.parallelism_framework_util import ThreadSafeDict
73 from gslib.progress_callback import ConstructAnnounceText 78 from gslib.progress_callback import ConstructAnnounceText
74 from gslib.progress_callback import FileProgressCallbackHandler 79 from gslib.progress_callback import FileProgressCallbackHandler
75 from gslib.progress_callback import ProgressCallbackWithBackoff 80 from gslib.progress_callback import ProgressCallbackWithBackoff
76 from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper 81 from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper
77 from gslib.storage_url import ContainsWildcard 82 from gslib.storage_url import ContainsWildcard
78 from gslib.storage_url import StorageUrlFromString 83 from gslib.storage_url import StorageUrlFromString
79 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m essages 84 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m essages
85 from gslib.tracker_file import DeleteDownloadTrackerFiles
80 from gslib.tracker_file import DeleteTrackerFile 86 from gslib.tracker_file import DeleteTrackerFile
81 from gslib.tracker_file import GetTrackerFilePath 87 from gslib.tracker_file import GetTrackerFilePath
82 from gslib.tracker_file import RaiseUnwritableTrackerFileException 88 from gslib.tracker_file import RaiseUnwritableTrackerFileException
83 from gslib.tracker_file import ReadOrCreateDownloadTrackerFile 89 from gslib.tracker_file import ReadOrCreateDownloadTrackerFile
84 from gslib.tracker_file import TrackerFileType 90 from gslib.tracker_file import TrackerFileType
91 from gslib.tracker_file import WriteDownloadComponentTrackerFile
85 from gslib.translation_helper import AddS3MarkerAclToObjectMetadata 92 from gslib.translation_helper import AddS3MarkerAclToObjectMetadata
86 from gslib.translation_helper import CopyObjectMetadata 93 from gslib.translation_helper import CopyObjectMetadata
87 from gslib.translation_helper import DEFAULT_CONTENT_TYPE 94 from gslib.translation_helper import DEFAULT_CONTENT_TYPE
88 from gslib.translation_helper import GenerationFromUrlAndString 95 from gslib.translation_helper import GenerationFromUrlAndString
89 from gslib.translation_helper import ObjectMetadataFromHeaders 96 from gslib.translation_helper import ObjectMetadataFromHeaders
90 from gslib.translation_helper import PreconditionsFromHeaders 97 from gslib.translation_helper import PreconditionsFromHeaders
91 from gslib.translation_helper import S3MarkerAclFromObjectMetadata 98 from gslib.translation_helper import S3MarkerAclFromObjectMetadata
99 from gslib.util import CheckFreeSpace
100 from gslib.util import CheckMultiprocessingAvailableAndInit
92 from gslib.util import CreateLock 101 from gslib.util import CreateLock
93 from gslib.util import DEFAULT_FILE_BUFFER_SIZE 102 from gslib.util import DEFAULT_FILE_BUFFER_SIZE
103 from gslib.util import DivideAndCeil
94 from gslib.util import GetCloudApiInstance 104 from gslib.util import GetCloudApiInstance
95 from gslib.util import GetFileSize 105 from gslib.util import GetFileSize
96 from gslib.util import GetJsonResumableChunkSize 106 from gslib.util import GetJsonResumableChunkSize
97 from gslib.util import GetMaxRetryDelay 107 from gslib.util import GetMaxRetryDelay
98 from gslib.util import GetNumRetries 108 from gslib.util import GetNumRetries
99 from gslib.util import GetStreamFromFileUrl 109 from gslib.util import GetStreamFromFileUrl
100 from gslib.util import HumanReadableToBytes 110 from gslib.util import HumanReadableToBytes
101 from gslib.util import IS_WINDOWS 111 from gslib.util import IS_WINDOWS
102 from gslib.util import IsCloudSubdirPlaceholder 112 from gslib.util import IsCloudSubdirPlaceholder
103 from gslib.util import MakeHumanReadable 113 from gslib.util import MakeHumanReadable
104 from gslib.util import MIN_SIZE_COMPUTE_LOGGING 114 from gslib.util import MIN_SIZE_COMPUTE_LOGGING
105 from gslib.util import MultiprocessingIsAvailable
106 from gslib.util import ResumableThreshold 115 from gslib.util import ResumableThreshold
107 from gslib.util import TEN_MIB 116 from gslib.util import TEN_MIB
117 from gslib.util import UsingCrcmodExtension
108 from gslib.util import UTF8 118 from gslib.util import UTF8
109 from gslib.wildcard_iterator import CreateWildcardIterator 119 from gslib.wildcard_iterator import CreateWildcardIterator
110 120
111 # pylint: disable=g-import-not-at-top 121 # pylint: disable=g-import-not-at-top
112 if IS_WINDOWS: 122 if IS_WINDOWS:
113 import msvcrt 123 import msvcrt
114 from ctypes import c_int
115 from ctypes import c_uint64
116 from ctypes import c_char_p
117 from ctypes import c_wchar_p
118 from ctypes import windll
119 from ctypes import POINTER
120 from ctypes import WINFUNCTYPE
121 from ctypes import WinError
122 124
123 # Declare copy_helper_opts as a global because namedtuple isn't aware of 125 # Declare copy_helper_opts as a global because namedtuple isn't aware of
124 # assigning to a class member (which breaks pickling done by multiprocessing). 126 # assigning to a class member (which breaks pickling done by multiprocessing).
125 # For details see 127 # For details see
126 # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instanc e-correctly 128 # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instanc e-correctly
127 # Similarly can't pickle logger.
128 # pylint: disable=global-at-module-level 129 # pylint: disable=global-at-module-level
129 global global_copy_helper_opts, global_logger 130 global global_copy_helper_opts
130 131
131 # In-memory map of local files that are currently opened for write. Used to 132 # In-memory map of local files that are currently opened for write. Used to
132 # ensure that if we write to the same file twice (say, for example, because the 133 # ensure that if we write to the same file twice (say, for example, because the
133 # user specified two identical source URLs), the writes occur serially. 134 # user specified two identical source URLs), the writes occur serially.
134 global open_files_map 135 global open_files_map, open_files_lock
135 open_files_map = ( 136 open_files_map = (
136 ThreadSafeDict() if (IS_WINDOWS or not MultiprocessingIsAvailable()[0]) 137 AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available
137 else ThreadAndProcessSafeDict(multiprocessing.Manager())) 138 else AtomicDict(manager=gslib.util.manager))
139
140 # We don't allow multiple processes on Windows, so using a process-safe lock
141 # would be unnecessary.
142 open_files_lock = CreateLock()
138 143
139 # For debugging purposes; if True, files and objects that fail hash validation 144 # For debugging purposes; if True, files and objects that fail hash validation
140 # will be saved with the below suffix appended. 145 # will be saved with the below suffix appended.
141 _RENAME_ON_HASH_MISMATCH = False 146 _RENAME_ON_HASH_MISMATCH = False
142 _RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt' 147 _RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt'
143 148
144 PARALLEL_UPLOAD_TEMP_NAMESPACE = ( 149 PARALLEL_UPLOAD_TEMP_NAMESPACE = (
145 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/') 150 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/')
146 151
147 PARALLEL_UPLOAD_STATIC_SALT = u""" 152 PARALLEL_UPLOAD_STATIC_SALT = u"""
(...skipping 23 matching lines...) Expand all
171 # canned_acl: canned_acl to apply to the uploaded file/component. 176 # canned_acl: canned_acl to apply to the uploaded file/component.
172 # content_type: content-type for final object, used for setting content-type 177 # content_type: content-type for final object, used for setting content-type
173 # of components and final object. 178 # of components and final object.
174 # tracker_file: tracker file for this component. 179 # tracker_file: tracker file for this component.
175 # tracker_file_lock: tracker file lock for tracker file(s). 180 # tracker_file_lock: tracker file lock for tracker file(s).
176 PerformParallelUploadFileToObjectArgs = namedtuple( 181 PerformParallelUploadFileToObjectArgs = namedtuple(
177 'PerformParallelUploadFileToObjectArgs', 182 'PerformParallelUploadFileToObjectArgs',
178 'filename file_start file_length src_url dst_url canned_acl ' 183 'filename file_start file_length src_url dst_url canned_acl '
179 'content_type tracker_file tracker_file_lock') 184 'content_type tracker_file tracker_file_lock')
180 185
186 PerformSlicedDownloadObjectToFileArgs = namedtuple(
187 'PerformSlicedDownloadObjectToFileArgs',
188 'component_num src_url src_obj_metadata dst_url download_file_name '
189 'start_byte end_byte')
190
191 PerformSlicedDownloadReturnValues = namedtuple(
192 'PerformSlicedDownloadReturnValues',
193 'component_num crc32c bytes_transferred server_encoding')
194
181 ObjectFromTracker = namedtuple('ObjectFromTracker', 195 ObjectFromTracker = namedtuple('ObjectFromTracker',
182 'object_name generation') 196 'object_name generation')
183 197
184 # TODO: Refactor this file to be less cumbersome. In particular, some of the 198 # TODO: Refactor this file to be less cumbersome. In particular, some of the
185 # different paths (e.g., uploading a file to an object vs. downloading an 199 # different paths (e.g., uploading a file to an object vs. downloading an
186 # object to a file) could be split into separate files. 200 # object to a file) could be split into separate files.
187 201
188 # Chunk size to use while zipping/unzipping gzip files. 202 # Chunk size to use while zipping/unzipping gzip files.
189 GZIP_CHUNK_SIZE = 8192 203 GZIP_CHUNK_SIZE = 8192
190 204
205 # Number of bytes to wait before updating a sliced download component tracker
206 # file.
207 TRACKERFILE_UPDATE_THRESHOLD = TEN_MIB
208
191 PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024 209 PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024
192 210
193 # S3 requires special Multipart upload logic (that we currently don't implement) 211 # S3 requires special Multipart upload logic (that we currently don't implement)
194 # for files > 5GiB in size. 212 # for files > 5GiB in size.
195 S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024 213 S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024
196 214
197 suggested_parallel_composites = False 215 # TODO: Create a multiprocessing framework value allocator, then use it instead
216 # of a dict.
217 global suggested_sliced_transfers, suggested_sliced_transfers_lock
218 suggested_sliced_transfers = (
219 AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available
220 else AtomicDict(manager=gslib.util.manager))
221 suggested_sliced_transfers_lock = CreateLock()
198 222
199 223
200 class FileConcurrencySkipError(Exception): 224 class FileConcurrencySkipError(Exception):
201 """Raised when skipping a file due to a concurrent, duplicate copy.""" 225 """Raised when skipping a file due to a concurrent, duplicate copy."""
202 226
203 227
204 def _RmExceptionHandler(cls, e): 228 def _RmExceptionHandler(cls, e):
205 """Simple exception handler to allow post-completion status.""" 229 """Simple exception handler to allow post-completion status."""
206 cls.logger.error(str(e)) 230 cls.logger.error(str(e))
207 231
208 232
209 def _ParallelUploadCopyExceptionHandler(cls, e): 233 def _ParallelCopyExceptionHandler(cls, e):
210 """Simple exception handler to allow post-completion status.""" 234 """Simple exception handler to allow post-completion status."""
211 cls.logger.error(str(e)) 235 cls.logger.error(str(e))
212 cls.op_failure_count += 1 236 cls.op_failure_count += 1
213 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n', 237 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n',
214 traceback.format_exc()) 238 traceback.format_exc())
215 239
216 240
217 def _PerformParallelUploadFileToObject(cls, args, thread_state=None): 241 def _PerformParallelUploadFileToObject(cls, args, thread_state=None):
218 """Function argument to Apply for performing parallel composite uploads. 242 """Function argument to Apply for performing parallel composite uploads.
219 243
(...skipping 21 matching lines...) Expand all
241 265
242 try: 266 try:
243 if global_copy_helper_opts.canned_acl: 267 if global_copy_helper_opts.canned_acl:
244 # No canned ACL support in JSON, force XML API to be used for 268 # No canned ACL support in JSON, force XML API to be used for
245 # upload/copy operations. 269 # upload/copy operations.
246 orig_prefer_api = gsutil_api.prefer_api 270 orig_prefer_api = gsutil_api.prefer_api
247 gsutil_api.prefer_api = ApiSelector.XML 271 gsutil_api.prefer_api = ApiSelector.XML
248 ret = _UploadFileToObject(args.src_url, fp, args.file_length, 272 ret = _UploadFileToObject(args.src_url, fp, args.file_length,
249 args.dst_url, dst_object_metadata, 273 args.dst_url, dst_object_metadata,
250 preconditions, gsutil_api, cls.logger, cls, 274 preconditions, gsutil_api, cls.logger, cls,
251 _ParallelUploadCopyExceptionHandler, 275 _ParallelCopyExceptionHandler,
252 gzip_exts=None, allow_splitting=False) 276 gzip_exts=None, allow_splitting=False)
253 finally: 277 finally:
254 if global_copy_helper_opts.canned_acl: 278 if global_copy_helper_opts.canned_acl:
255 gsutil_api.prefer_api = orig_prefer_api 279 gsutil_api.prefer_api = orig_prefer_api
256 280
257 component = ret[2] 281 component = ret[2]
258 _AppendComponentTrackerToParallelUploadTrackerFile( 282 _AppendComponentTrackerToParallelUploadTrackerFile(
259 args.tracker_file, component, args.tracker_file_lock) 283 args.tracker_file, component, args.tracker_file_lock)
260 return ret 284 return ret
261 285
(...skipping 372 matching lines...) Expand 10 before | Expand all | Expand 10 after
634 658
635 def _CreateDigestsFromDigesters(digesters): 659 def _CreateDigestsFromDigesters(digesters):
636 digests = {} 660 digests = {}
637 if digesters: 661 if digesters:
638 for alg in digesters: 662 for alg in digesters:
639 digests[alg] = base64.encodestring( 663 digests[alg] = base64.encodestring(
640 digesters[alg].digest()).rstrip('\n') 664 digesters[alg].digest()).rstrip('\n')
641 return digests 665 return digests
642 666
643 667
644 def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata): 668 def _CreateDigestsFromLocalFile(logger, algs, file_name, final_file_name,
669 src_obj_metadata):
645 """Creates a base64 CRC32C and/or MD5 digest from file_name. 670 """Creates a base64 CRC32C and/or MD5 digest from file_name.
646 671
647 Args: 672 Args:
648 logger: for outputting log messages. 673 logger: For outputting log messages.
649 algs: list of algorithms to compute. 674 algs: List of algorithms to compute.
650 file_name: file to digest. 675 file_name: File to digest.
651 src_obj_metadata: metadta of source object. 676 final_file_name: Permanent location to be used for the downloaded file
677 after validation (used for logging).
678 src_obj_metadata: Metadata of source object.
652 679
653 Returns: 680 Returns:
654 Dict of algorithm name : base 64 encoded digest 681 Dict of algorithm name : base 64 encoded digest
655 """ 682 """
656 hash_dict = {} 683 hash_dict = {}
657 if 'md5' in algs: 684 if 'md5' in algs:
658 if src_obj_metadata.size and src_obj_metadata.size > TEN_MIB:
659 logger.info(
660 'Computing MD5 for %s...', file_name)
661 hash_dict['md5'] = md5() 685 hash_dict['md5'] = md5()
662 if 'crc32c' in algs: 686 if 'crc32c' in algs:
663 hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c') 687 hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c')
664 with open(file_name, 'rb') as fp: 688 with open(file_name, 'rb') as fp:
665 CalculateHashesFromContents( 689 CalculateHashesFromContents(
666 fp, hash_dict, ProgressCallbackWithBackoff( 690 fp, hash_dict, ProgressCallbackWithBackoff(
667 src_obj_metadata.size, 691 src_obj_metadata.size,
668 FileProgressCallbackHandler( 692 FileProgressCallbackHandler(
669 ConstructAnnounceText('Hashing', file_name), logger).call)) 693 ConstructAnnounceText('Hashing', final_file_name),
694 logger).call))
670 digests = {} 695 digests = {}
671 for alg_name, digest in hash_dict.iteritems(): 696 for alg_name, digest in hash_dict.iteritems():
672 digests[alg_name] = Base64EncodeHash(digest.hexdigest()) 697 digests[alg_name] = Base64EncodeHash(digest.hexdigest())
673 return digests 698 return digests
674 699
675 700
676 def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, 701 def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
677 dst_obj_metadata): 702 dst_obj_metadata):
678 """Validates integrity of two cloud objects copied via daisy-chain. 703 """Validates integrity of two cloud objects copied via daisy-chain.
679 704
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
723 748
724 749
725 def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests, 750 def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests,
726 is_upload=False): 751 is_upload=False):
727 """Validates integrity by comparing cloud digest to local digest. 752 """Validates integrity by comparing cloud digest to local digest.
728 753
729 Args: 754 Args:
730 logger: for outputting log messages. 755 logger: for outputting log messages.
731 obj_url: CloudUrl for cloud object. 756 obj_url: CloudUrl for cloud object.
732 obj_metadata: Cloud Object being downloaded from or uploaded to. 757 obj_metadata: Cloud Object being downloaded from or uploaded to.
733 file_name: Local file name on disk being downloaded to or uploaded from. 758 file_name: Local file name on disk being downloaded to or uploaded from
759 (used only for logging).
734 digests: Computed Digests for the object. 760 digests: Computed Digests for the object.
735 is_upload: If true, comparing for an uploaded object (controls logging). 761 is_upload: If true, comparing for an uploaded object (controls logging).
736 762
737 Raises: 763 Raises:
738 CommandException: if cloud digests don't match local digests. 764 CommandException: if cloud digests don't match local digests.
739 """ 765 """
740 local_hashes = digests 766 local_hashes = digests
741 cloud_hashes = {} 767 cloud_hashes = {}
742 if obj_metadata.md5Hash: 768 if obj_metadata.md5Hash:
743 cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n') 769 cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n')
(...skipping 234 matching lines...) Expand 10 before | Expand all | Expand 10 after
978 request_components, dst_obj_metadata, preconditions=preconditions, 1004 request_components, dst_obj_metadata, preconditions=preconditions,
979 provider=dst_url.scheme, fields=['generation', 'crc32c', 'size']) 1005 provider=dst_url.scheme, fields=['generation', 'crc32c', 'size'])
980 1006
981 try: 1007 try:
982 # Make sure only to delete things that we know were successfully 1008 # Make sure only to delete things that we know were successfully
983 # uploaded (as opposed to all of the objects that we attempted to 1009 # uploaded (as opposed to all of the objects that we attempted to
984 # create) so that we don't delete any preexisting objects, except for 1010 # create) so that we don't delete any preexisting objects, except for
985 # those that were uploaded by a previous, failed run and have since 1011 # those that were uploaded by a previous, failed run and have since
986 # changed (but still have an old generation lying around). 1012 # changed (but still have an old generation lying around).
987 objects_to_delete = components + existing_objects_to_delete 1013 objects_to_delete = components + existing_objects_to_delete
988 command_obj.Apply(_DeleteObjectFn, objects_to_delete, _RmExceptionHandler, 1014 command_obj.Apply(
989 arg_checker=gslib.command.DummyArgChecker, 1015 _DeleteTempComponentObjectFn, objects_to_delete, _RmExceptionHandler,
990 parallel_operations_override=True) 1016 arg_checker=gslib.command.DummyArgChecker,
1017 parallel_operations_override=True)
991 except Exception: # pylint: disable=broad-except 1018 except Exception: # pylint: disable=broad-except
992 # If some of the delete calls fail, don't cause the whole command to 1019 # If some of the delete calls fail, don't cause the whole command to
993 # fail. The copy was successful iff the compose call succeeded, so 1020 # fail. The copy was successful iff the compose call succeeded, so
994 # reduce this to a warning. 1021 # reduce this to a warning.
995 logging.warning( 1022 logging.warning(
996 'Failed to delete some of the following temporary objects:\n' + 1023 'Failed to delete some of the following temporary objects:\n' +
997 '\n'.join(dst_args.keys())) 1024 '\n'.join(dst_args.keys()))
998 finally: 1025 finally:
999 with tracker_file_lock: 1026 with tracker_file_lock:
1000 if os.path.exists(tracker_file): 1027 if os.path.exists(tracker_file):
(...skipping 17 matching lines...) Expand all
1018 logger: for outputting log messages. 1045 logger: for outputting log messages.
1019 allow_splitting: If false, then this function returns false. 1046 allow_splitting: If false, then this function returns false.
1020 src_url: FileUrl corresponding to a local file. 1047 src_url: FileUrl corresponding to a local file.
1021 dst_url: CloudUrl corresponding to destination cloud object. 1048 dst_url: CloudUrl corresponding to destination cloud object.
1022 file_size: The size of the source file, in bytes. 1049 file_size: The size of the source file, in bytes.
1023 canned_acl: Canned ACL to apply to destination object, if any. 1050 canned_acl: Canned ACL to apply to destination object, if any.
1024 1051
1025 Returns: 1052 Returns:
1026 True iff a parallel upload should be performed on the source file. 1053 True iff a parallel upload should be performed on the source file.
1027 """ 1054 """
1028 global suggested_parallel_composites 1055 global suggested_slice_transfers, suggested_sliced_transfers_lock
1029 parallel_composite_upload_threshold = HumanReadableToBytes(config.get( 1056 parallel_composite_upload_threshold = HumanReadableToBytes(config.get(
1030 'GSUtil', 'parallel_composite_upload_threshold', 1057 'GSUtil', 'parallel_composite_upload_threshold',
1031 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD)) 1058 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD))
1032 1059
1033 all_factors_but_size = ( 1060 all_factors_but_size = (
1034 allow_splitting # Don't split the pieces multiple times. 1061 allow_splitting # Don't split the pieces multiple times.
1035 and not src_url.IsStream() # We can't partition streams. 1062 and not src_url.IsStream() # We can't partition streams.
1036 and dst_url.scheme == 'gs' # Compose is only for gs. 1063 and dst_url.scheme == 'gs' # Compose is only for gs.
1037 and not canned_acl) # TODO: Implement canned ACL support for compose. 1064 and not canned_acl) # TODO: Implement canned ACL support for compose.
1038 1065
1039 # Since parallel composite uploads are disabled by default, make user aware of 1066 # Since parallel composite uploads are disabled by default, make user aware of
1040 # them. 1067 # them.
1041 # TODO: Once compiled crcmod is being distributed by major Linux distributions 1068 # TODO: Once compiled crcmod is being distributed by major Linux distributions
1042 # remove this check. 1069 # remove this check.
1043 if (all_factors_but_size and parallel_composite_upload_threshold == 0 1070 if (all_factors_but_size and parallel_composite_upload_threshold == 0
1044 and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD 1071 and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD):
1045 and not suggested_parallel_composites): 1072 with suggested_sliced_transfers_lock:
1046 logger.info('\n'.join(textwrap.wrap( 1073 if not suggested_sliced_transfers.get('suggested'):
1047 '==> NOTE: You are uploading one or more large file(s), which would ' 1074 logger.info('\n'.join(textwrap.wrap(
1048 'run significantly faster if you enable parallel composite uploads. ' 1075 '==> NOTE: You are uploading one or more large file(s), which '
1049 'This feature can be enabled by editing the ' 1076 'would run significantly faster if you enable parallel composite '
1050 '"parallel_composite_upload_threshold" value in your .boto ' 1077 'uploads. This feature can be enabled by editing the '
1051 'configuration file. However, note that if you do this you and any ' 1078 '"parallel_composite_upload_threshold" value in your .boto '
1052 'users that download such composite files will need to have a compiled ' 1079 'configuration file. However, note that if you do this you and any '
1053 'crcmod installed (see "gsutil help crcmod").')) + '\n') 1080 'users that download such composite files will need to have a '
1054 suggested_parallel_composites = True 1081 'compiled crcmod installed (see "gsutil help crcmod").')) + '\n')
1082 suggested_sliced_transfers['suggested'] = True
1055 1083
1056 return (all_factors_but_size 1084 return (all_factors_but_size
1057 and parallel_composite_upload_threshold > 0 1085 and parallel_composite_upload_threshold > 0
1058 and file_size >= parallel_composite_upload_threshold) 1086 and file_size >= parallel_composite_upload_threshold)
1059 1087
1060 1088
1061 def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id, 1089 def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id,
1062 treat_nonexistent_object_as_subdir=False): 1090 treat_nonexistent_object_as_subdir=False):
1063 """Expands wildcard if present in url_str. 1091 """Expands wildcard if present in url_str.
1064 1092
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
1097 storage_url = StorageUrlFromString(url_str) 1125 storage_url = StorageUrlFromString(url_str)
1098 1126
1099 # Handle non-wildcarded URL. 1127 # Handle non-wildcarded URL.
1100 if storage_url.IsFileUrl(): 1128 if storage_url.IsFileUrl():
1101 return (storage_url, storage_url.IsDirectory()) 1129 return (storage_url, storage_url.IsDirectory())
1102 1130
1103 # At this point we have a cloud URL. 1131 # At this point we have a cloud URL.
1104 if storage_url.IsBucket(): 1132 if storage_url.IsBucket():
1105 return (storage_url, True) 1133 return (storage_url, True)
1106 1134
1107 # For object/prefix URLs check 3 cases: (a) if the name ends with '/' treat 1135 # For object/prefix URLs, there are four cases that indicate the destination
1108 # as a subdir; otherwise, use the wildcard iterator with url to 1136 # is a cloud subdirectory; these are always considered to be an existing
1109 # find if (b) there's a Prefix matching url, or (c) name is of form 1137 # container. Checking each case allows gsutil to provide Unix-like
1110 # dir_$folder$ (and in both these cases also treat dir as a subdir). 1138 # destination folder semantics, but requires up to three HTTP calls, noted
1111 # Cloud subdirs are always considered to be an existing container. 1139 # below.
1140
1141 # Case 1: If a placeholder object ending with '/' exists.
1112 if IsCloudSubdirPlaceholder(storage_url): 1142 if IsCloudSubdirPlaceholder(storage_url):
1113 return (storage_url, True) 1143 return (storage_url, True)
1114 1144
1115 # Check for the special case where we have a folder marker object. 1145 # HTTP call to make an eventually consistent check for a matching prefix,
1116 folder_expansion = CreateWildcardIterator( 1146 # _$folder$, or empty listing.
1117 storage_url.versionless_url_string + '_$folder$', gsutil_api, 1147 expansion_empty = True
1118 debug=debug, project_id=project_id).IterAll( 1148 list_iterator = gsutil_api.ListObjects(
1119 bucket_listing_fields=['name']) 1149 storage_url.bucket_name, prefix=storage_url.object_name, delimiter='/',
1120 for blr in folder_expansion: 1150 provider=storage_url.scheme, fields=['prefixes', 'items/name'])
1121 return (storage_url, True) 1151 for obj_or_prefix in list_iterator:
1152 # To conserve HTTP calls for the common case, we make a single listing
1153 # that covers prefixes and object names. Listing object names covers the
1154 # _$folder$ case and the nonexistent-object-as-subdir case. However, if
1155 # there are many existing objects for which the target URL is an exact
1156 # prefix, this listing could be paginated and span multiple HTTP calls.
1157 # If this case becomes common, we could heurestically abort the
1158 # listing operation after the first page of results and just query for the
1159 # _$folder$ object directly using GetObjectMetadata.
1160 expansion_empty = False
1122 1161
1123 blr_expansion = CreateWildcardIterator(url_str, gsutil_api, 1162 if obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.PREFIX:
1124 debug=debug, 1163 # Case 2: If there is a matching prefix when listing the destination URL.
1125 project_id=project_id).IterAll( 1164 return (storage_url, True)
1126 bucket_listing_fields=['name']) 1165 elif (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT and
1127 expansion_empty = True 1166 obj_or_prefix.data.name == storage_url.object_name + '_$folder$'):
1128 for blr in blr_expansion: 1167 # Case 3: If a placeholder object matching destination + _$folder$
1129 expansion_empty = False 1168 # exists.
1130 if blr.IsPrefix():
1131 return (storage_url, True) 1169 return (storage_url, True)
1132 1170
1133 return (storage_url, 1171 # Case 4: If no objects/prefixes matched, and nonexistent objects should be
1134 expansion_empty and treat_nonexistent_object_as_subdir) 1172 # treated as subdirectories.
1173 return (storage_url, expansion_empty and treat_nonexistent_object_as_subdir)
1135 1174
1136 1175
1137 def FixWindowsNaming(src_url, dst_url): 1176 def FixWindowsNaming(src_url, dst_url):
1138 """Translates Windows pathnames to cloud pathnames. 1177 """Translates Windows pathnames to cloud pathnames.
1139 1178
1140 Rewrites the destination URL built by ConstructDstUrl(). 1179 Rewrites the destination URL built by ConstructDstUrl().
1141 1180
1142 Args: 1181 Args:
1143 src_url: Source StorageUrl to be copied. 1182 src_url: Source StorageUrl to be copied.
1144 dst_url: The destination StorageUrl built by ConstructDstUrl(). 1183 dst_url: The destination StorageUrl built by ConstructDstUrl().
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
1235 end_time = time.time() 1274 end_time = time.time()
1236 1275
1237 result_url = dst_url.Clone() 1276 result_url = dst_url.Clone()
1238 result_url.generation = GenerationFromUrlAndString(result_url, 1277 result_url.generation = GenerationFromUrlAndString(result_url,
1239 dst_obj.generation) 1278 dst_obj.generation)
1240 1279
1241 return (end_time - start_time, src_obj_metadata.size, result_url, 1280 return (end_time - start_time, src_obj_metadata.size, result_url,
1242 dst_obj.md5Hash) 1281 dst_obj.md5Hash)
1243 1282
1244 1283
1245 def _CheckFreeSpace(path):
1246 """Return path/drive free space (in bytes)."""
1247 if IS_WINDOWS:
1248 # pylint: disable=g-import-not-at-top
1249 try:
1250 # pylint: disable=invalid-name
1251 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p,
1252 POINTER(c_uint64),
1253 POINTER(c_uint64),
1254 POINTER(c_uint64))
1255 get_disk_free_space_ex = get_disk_free_space_ex(
1256 ('GetDiskFreeSpaceExW', windll.kernel32), (
1257 (1, 'lpszPathName'),
1258 (2, 'lpFreeUserSpace'),
1259 (2, 'lpTotalSpace'),
1260 (2, 'lpFreeSpace'),))
1261 except AttributeError:
1262 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p,
1263 POINTER(c_uint64),
1264 POINTER(c_uint64),
1265 POINTER(c_uint64))
1266 get_disk_free_space_ex = get_disk_free_space_ex(
1267 ('GetDiskFreeSpaceExA', windll.kernel32), (
1268 (1, 'lpszPathName'),
1269 (2, 'lpFreeUserSpace'),
1270 (2, 'lpTotalSpace'),
1271 (2, 'lpFreeSpace'),))
1272
1273 def GetDiskFreeSpaceExErrCheck(result, unused_func, args):
1274 if not result:
1275 raise WinError()
1276 return args[1].value
1277 get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck
1278
1279 return get_disk_free_space_ex(os.getenv('SystemDrive'))
1280 else:
1281 (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path)
1282 return f_frsize * f_bavail
1283
1284
1285 def _SetContentTypeFromFile(src_url, dst_obj_metadata): 1284 def _SetContentTypeFromFile(src_url, dst_obj_metadata):
1286 """Detects and sets Content-Type if src_url names a local file. 1285 """Detects and sets Content-Type if src_url names a local file.
1287 1286
1288 Args: 1287 Args:
1289 src_url: Source StorageUrl. 1288 src_url: Source StorageUrl.
1290 dst_obj_metadata: Object-specific metadata that should be overidden during 1289 dst_obj_metadata: Object-specific metadata that should be overidden during
1291 the copy. 1290 the copy.
1292 """ 1291 """
1293 # contentType == '' if user requested default type. 1292 # contentType == '' if user requested default type.
1294 if (dst_obj_metadata.contentType is None and src_url.IsFileUrl() 1293 if (dst_obj_metadata.contentType is None and src_url.IsFileUrl()
(...skipping 207 matching lines...) Expand 10 before | Expand all | Expand 10 after
1502 # TODO: Compress using a streaming model as opposed to all at once here. 1501 # TODO: Compress using a streaming model as opposed to all at once here.
1503 if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING: 1502 if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING:
1504 logger.info( 1503 logger.info(
1505 'Compressing %s (to tmp)...', src_url) 1504 'Compressing %s (to tmp)...', src_url)
1506 (gzip_fh, gzip_path) = tempfile.mkstemp() 1505 (gzip_fh, gzip_path) = tempfile.mkstemp()
1507 gzip_fp = None 1506 gzip_fp = None
1508 try: 1507 try:
1509 # Check for temp space. Assume the compressed object is at most 2x 1508 # Check for temp space. Assume the compressed object is at most 2x
1510 # the size of the object (normally should compress to smaller than 1509 # the size of the object (normally should compress to smaller than
1511 # the object) 1510 # the object)
1512 if _CheckFreeSpace(gzip_path) < 2*int(src_obj_size): 1511 if CheckFreeSpace(gzip_path) < 2*int(src_obj_size):
1513 raise CommandException('Inadequate temp space available to compress ' 1512 raise CommandException('Inadequate temp space available to compress '
1514 '%s. See the CHANGING TEMP DIRECTORIES section ' 1513 '%s. See the CHANGING TEMP DIRECTORIES section '
1515 'of "gsutil help cp" for more info.' % src_url) 1514 'of "gsutil help cp" for more info.' % src_url)
1516 gzip_fp = gzip.open(gzip_path, 'wb') 1515 gzip_fp = gzip.open(gzip_path, 'wb')
1517 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) 1516 data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
1518 while data: 1517 while data:
1519 gzip_fp.write(data) 1518 gzip_fp.write(data)
1520 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) 1519 data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
1521 finally: 1520 finally:
1522 if gzip_fp: 1521 if gzip_fp:
(...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after
1650 result_url = dst_url.Clone() 1649 result_url = dst_url.Clone()
1651 1650
1652 result_url.generation = uploaded_object.generation 1651 result_url.generation = uploaded_object.generation
1653 result_url.generation = GenerationFromUrlAndString( 1652 result_url.generation = GenerationFromUrlAndString(
1654 result_url, uploaded_object.generation) 1653 result_url, uploaded_object.generation)
1655 1654
1656 return (elapsed_time, uploaded_object.size, result_url, 1655 return (elapsed_time, uploaded_object.size, result_url,
1657 uploaded_object.md5Hash) 1656 uploaded_object.md5Hash)
1658 1657
1659 1658
1660 # TODO: Refactor this long function into smaller pieces. 1659 def _GetDownloadFile(dst_url, src_obj_metadata, logger):
1661 # pylint: disable=too-many-statements 1660 """Creates a new download file, and deletes the file that will be replaced.
1662 def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, 1661
1663 gsutil_api, logger, test_method=None): 1662 Names and creates a temporary file for this download. Also, if there is an
1664 """Downloads an object to a local file. 1663 existing file at the path where this file will be placed after the download
1664 is completed, that file will be deleted.
1665 1665
1666 Args: 1666 Args:
1667 src_url: Source CloudUrl. 1667 dst_url: Destination FileUrl.
1668 src_obj_metadata: Metadata from the source object. 1668 src_obj_metadata: Metadata from the source object.
1669 dst_url: Destination FileUrl.
1670 gsutil_api: gsutil Cloud API instance to use for the download.
1671 logger: for outputting log messages. 1669 logger: for outputting log messages.
1672 test_method: Optional test method for modifying the file before validation 1670
1673 during unit tests.
1674 Returns: 1671 Returns:
1675 (elapsed_time, bytes_transferred, dst_url, md5), excluding overhead like 1672 (download_file_name, need_to_unzip)
1676 initial GET. 1673 download_file_name: The name of the temporary file to which the object will
1677 1674 be downloaded.
1678 Raises: 1675 need_to_unzip: If true, a temporary zip file was used and must be
1679 CommandException: if errors encountered. 1676 uncompressed as part of validation.
1680 """ 1677 """
1681 global open_files_map 1678 dir_name = os.path.dirname(dst_url.object_name)
1682 file_name = dst_url.object_name
1683 dir_name = os.path.dirname(file_name)
1684 if dir_name and not os.path.exists(dir_name): 1679 if dir_name and not os.path.exists(dir_name):
1685 # Do dir creation in try block so can ignore case where dir already 1680 # Do dir creation in try block so can ignore case where dir already
1686 # exists. This is needed to avoid a race condition when running gsutil 1681 # exists. This is needed to avoid a race condition when running gsutil
1687 # -m cp. 1682 # -m cp.
1688 try: 1683 try:
1689 os.makedirs(dir_name) 1684 os.makedirs(dir_name)
1690 except OSError, e: 1685 except OSError, e:
1691 if e.errno != errno.EEXIST: 1686 if e.errno != errno.EEXIST:
1692 raise 1687 raise
1693 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) 1688
1689 need_to_unzip = False
1694 # For gzipped objects download to a temp file and unzip. For the XML API, 1690 # For gzipped objects download to a temp file and unzip. For the XML API,
1695 # the represents the result of a HEAD request. For the JSON API, this is 1691 # this represents the result of a HEAD request. For the JSON API, this is
1696 # the stored encoding which the service may not respect. However, if the 1692 # the stored encoding which the service may not respect. However, if the
1697 # server sends decompressed bytes for a file that is stored compressed 1693 # server sends decompressed bytes for a file that is stored compressed
1698 # (double compressed case), there is no way we can validate the hash and 1694 # (double compressed case), there is no way we can validate the hash and
1699 # we will fail our hash check for the object. 1695 # we will fail our hash check for the object.
1700 if (src_obj_metadata.contentEncoding and 1696 if (src_obj_metadata.contentEncoding and
1701 src_obj_metadata.contentEncoding.lower().endswith('gzip')): 1697 src_obj_metadata.contentEncoding.lower().endswith('gzip')):
1702 # We can't use tempfile.mkstemp() here because we need a predictable 1698 need_to_unzip = True
1703 # filename for resumable downloads. 1699 download_file_name = _GetDownloadTempZipFileName(dst_url)
1704 download_file_name = _GetDownloadZipFileName(file_name)
1705 logger.info( 1700 logger.info(
1706 'Downloading to temp gzip filename %s', download_file_name) 1701 'Downloading to temp gzip filename %s', download_file_name)
1707 need_to_unzip = True
1708 else: 1702 else:
1709 download_file_name = file_name 1703 download_file_name = _GetDownloadTempFileName(dst_url)
1710 need_to_unzip = False 1704
1711 1705 # If a file exists at the permanent destination (where the file will be moved
1712 if download_file_name.endswith(dst_url.delim): 1706 # after the download is completed), delete it here to reduce disk space
1713 logger.warn('\n'.join(textwrap.wrap( 1707 # requirements.
1714 'Skipping attempt to download to filename ending with slash (%s). This ' 1708 if os.path.exists(dst_url.object_name):
1715 'typically happens when using gsutil to download from a subdirectory ' 1709 os.unlink(dst_url.object_name)
1716 'created by the Cloud Console (https://cloud.google.com/console)' 1710
1717 % download_file_name))) 1711 # Downloads open the temporary download file in r+b mode, which requires it
1718 return (0, 0, dst_url, '') 1712 # to already exist, so we create it here if it doesn't exist already.
1719 1713 fp = open(download_file_name, 'ab')
1720 # Set up hash digesters. 1714 fp.close()
1715 return download_file_name, need_to_unzip
1716
1717
1718 def _ShouldDoSlicedDownload(download_strategy, src_obj_metadata,
1719 allow_splitting, logger):
1720 """Determines whether the sliced download strategy should be used.
1721
1722 Args:
1723 download_strategy: CloudApi download strategy.
1724 src_obj_metadata: Metadata from the source object.
1725 allow_splitting: If false, then this function returns false.
1726 logger: logging.Logger for log message output.
1727
1728 Returns:
1729 True iff a sliced download should be performed on the source file.
1730 """
1731 sliced_object_download_threshold = HumanReadableToBytes(config.get(
1732 'GSUtil', 'sliced_object_download_threshold',
1733 DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD))
1734
1735 max_components = config.getint(
1736 'GSUtil', 'sliced_object_download_max_components',
1737 DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS)
1738
1739 # Don't use sliced download if it will prevent us from performing an
1740 # integrity check.
1741 check_hashes_config = config.get(
1742 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
1743 parallel_hashing = src_obj_metadata.crc32c and UsingCrcmodExtension(crcmod)
1744 hashing_okay = parallel_hashing or check_hashes_config == CHECK_HASH_NEVER
1745
1746 use_slice = (
1747 allow_splitting
1748 and download_strategy is not CloudApi.DownloadStrategy.ONE_SHOT
1749 and max_components > 1
1750 and hashing_okay
1751 and sliced_object_download_threshold > 0
1752 and src_obj_metadata.size >= sliced_object_download_threshold)
1753
1754 if (not use_slice
1755 and src_obj_metadata.size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD
1756 and not UsingCrcmodExtension(crcmod)
1757 and check_hashes_config != CHECK_HASH_NEVER):
1758 with suggested_sliced_transfers_lock:
1759 if not suggested_sliced_transfers.get('suggested'):
1760 logger.info('\n'.join(textwrap.wrap(
1761 '==> NOTE: You are downloading one or more large file(s), which '
1762 'would run significantly faster if you enabled sliced object '
1763 'uploads. This feature is enabled by default but requires that '
1764 'compiled crcmod be installed (see "gsutil help crcmod").')) + '\n')
1765 suggested_sliced_transfers['suggested'] = True
1766
1767 return use_slice
1768
1769
1770 def _PerformSlicedDownloadObjectToFile(cls, args, thread_state=None):
1771 """Function argument to Apply for performing sliced downloads.
1772
1773 Args:
1774 cls: Calling Command class.
1775 args: PerformSlicedDownloadObjectToFileArgs tuple describing the target.
1776 thread_state: gsutil Cloud API instance to use for the operation.
1777
1778 Returns:
1779 PerformSlicedDownloadReturnValues named-tuple filled with:
1780 component_num: The component number for this download.
1781 crc32c: CRC32C hash value (integer) of the downloaded bytes.
1782 bytes_transferred: The number of bytes transferred, potentially less
1783 than the component size if the download was resumed.
1784 """
1785 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
1721 hash_algs = GetDownloadHashAlgs( 1786 hash_algs = GetDownloadHashAlgs(
1722 logger, src_has_md5=src_obj_metadata.md5Hash, 1787 cls.logger, consider_crc32c=args.src_obj_metadata.crc32c)
1723 src_has_crc32c=src_obj_metadata.crc32c)
1724 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) 1788 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
1725 1789
1726 fp = None 1790 (bytes_transferred, server_encoding) = (
1727 # Tracks whether the server used a gzip encoding. 1791 _DownloadObjectToFileResumable(args.src_url, args.src_obj_metadata,
1792 args.dst_url, args.download_file_name,
1793 gsutil_api, cls.logger, digesters,
1794 component_num=args.component_num,
1795 start_byte=args.start_byte,
1796 end_byte=args.end_byte))
1797
1798 crc32c_val = None
1799 if 'crc32c' in digesters:
1800 crc32c_val = digesters['crc32c'].crcValue
1801 return PerformSlicedDownloadReturnValues(
1802 args.component_num, crc32c_val, bytes_transferred, server_encoding)
1803
1804
1805 def _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url,
1806 download_file_name, logger,
1807 api_selector, num_components):
1808 """Maintains sliced download tracker files in order to permit resumability.
1809
1810 Reads or creates a sliced download tracker file representing this object
1811 download. Upon an attempt at cross-process resumption, the contents of the
1812 sliced download tracker file are verified to make sure a resumption is
1813 possible and appropriate. In the case that a resumption should not be
1814 attempted, existing component tracker files are deleted (to prevent child
1815 processes from attempting resumption), and a new sliced download tracker
1816 file is created.
1817
1818 Args:
1819 src_obj_metadata: Metadata from the source object. Must include etag and
1820 generation.
1821 dst_url: Destination FileUrl.
1822 download_file_name: Temporary file name to be used for the download.
1823 logger: for outputting log messages.
1824 api_selector: The Cloud API implementation used.
1825 num_components: The number of components to perform this download with.
1826 """
1827 assert src_obj_metadata.etag
1828 tracker_file = None
1829
1830 # Only can happen if the resumable threshold is set higher than the
1831 # parallel transfer threshold.
1832 if src_obj_metadata.size < ResumableThreshold():
1833 return
1834
1835 tracker_file_name = GetTrackerFilePath(dst_url,
1836 TrackerFileType.SLICED_DOWNLOAD,
1837 api_selector)
1838
1839 # Check to see if we should attempt resuming the download.
1840 try:
1841 fp = open(download_file_name, 'rb')
1842 existing_file_size = GetFileSize(fp)
1843 # A parallel resumption should be attempted only if the destination file
1844 # size is exactly the same as the source size and the tracker file matches.
1845 if existing_file_size == src_obj_metadata.size:
1846 tracker_file = open(tracker_file_name, 'r')
1847 tracker_file_data = json.load(tracker_file)
1848 if (tracker_file_data['etag'] == src_obj_metadata.etag and
1849 tracker_file_data['generation'] == src_obj_metadata.generation and
1850 tracker_file_data['num_components'] == num_components):
1851 return
1852 else:
1853 tracker_file.close()
1854 logger.warn('Sliced download tracker file doesn\'t match for '
1855 'download of %s. Restarting download from scratch.' %
1856 dst_url.object_name)
1857
1858 except (IOError, ValueError) as e:
1859 # Ignore non-existent file (happens first time a download
1860 # is attempted on an object), but warn user for other errors.
1861 if isinstance(e, ValueError) or e.errno != errno.ENOENT:
1862 logger.warn('Couldn\'t read sliced download tracker file (%s): %s. '
1863 'Restarting download from scratch.' %
1864 (tracker_file_name, str(e)))
1865 finally:
1866 if fp:
1867 fp.close()
1868 if tracker_file:
1869 tracker_file.close()
1870
1871 # Delete component tracker files to guarantee download starts from scratch.
1872 DeleteDownloadTrackerFiles(dst_url, api_selector)
1873
1874 # Create a new sliced download tracker file to represent this download.
1875 try:
1876 with open(tracker_file_name, 'w') as tracker_file:
1877 tracker_file_data = {'etag': src_obj_metadata.etag,
1878 'generation': src_obj_metadata.generation,
1879 'num_components': num_components}
1880 tracker_file.write(json.dumps(tracker_file_data))
1881 except IOError as e:
1882 RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)
1883
1884
1885 class SlicedDownloadFileWrapper(object):
1886 """Wraps a file object to be used in GetObjectMedia for sliced downloads.
1887
1888 In order to allow resumability, the file object used by each thread in a
1889 sliced object download should be wrapped using SlicedDownloadFileWrapper.
1890 Passing a SlicedDownloadFileWrapper object to GetObjectMedia will allow the
1891 download component tracker file for this component to be updated periodically,
1892 while the downloaded bytes are normally written to file.
1893 """
1894
1895 def __init__(self, fp, tracker_file_name, src_obj_metadata, start_byte,
1896 end_byte):
1897 """Initializes the SlicedDownloadFileWrapper.
1898
1899 Args:
1900 fp: The already-open file object to be used for writing in
1901 GetObjectMedia. Data will be written to file starting at the current
1902 seek position.
1903 tracker_file_name: The name of the tracker file for this component.
1904 src_obj_metadata: Metadata from the source object. Must include etag and
1905 generation.
1906 start_byte: The first byte to be downloaded for this parallel component.
1907 end_byte: The last byte to be downloaded for this parallel component.
1908 """
1909 self._orig_fp = fp
1910 self._tracker_file_name = tracker_file_name
1911 self._src_obj_metadata = src_obj_metadata
1912 self._last_tracker_file_byte = None
1913 self._start_byte = start_byte
1914 self._end_byte = end_byte
1915
1916 def write(self, data): # pylint: disable=invalid-name
1917 current_file_pos = self._orig_fp.tell()
1918 assert (self._start_byte <= current_file_pos and
1919 current_file_pos + len(data) <= self._end_byte + 1)
1920
1921 self._orig_fp.write(data)
1922 current_file_pos = self._orig_fp.tell()
1923
1924 threshold = TRACKERFILE_UPDATE_THRESHOLD
1925 if (self._last_tracker_file_byte is None or
1926 current_file_pos - self._last_tracker_file_byte > threshold or
1927 current_file_pos == self._end_byte + 1):
1928 WriteDownloadComponentTrackerFile(
1929 self._tracker_file_name, self._src_obj_metadata, current_file_pos)
1930 self._last_tracker_file_byte = current_file_pos
1931
1932 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
1933 if whence == os.SEEK_END:
1934 self._orig_fp.seek(offset + self._end_byte + 1)
1935 else:
1936 self._orig_fp.seek(offset, whence)
1937 assert self._start_byte <= self._orig_fp.tell() <= self._end_byte + 1
1938
1939 def tell(self): # pylint: disable=invalid-name
1940 return self._orig_fp.tell()
1941
1942 def flush(self): # pylint: disable=invalid-name
1943 self._orig_fp.flush()
1944
1945 def close(self): # pylint: disable=invalid-name
1946 if self._orig_fp:
1947 self._orig_fp.close()
1948
1949
1950 def _PartitionObject(src_url, src_obj_metadata, dst_url,
1951 download_file_name):
1952 """Partitions an object into components to be downloaded.
1953
1954 Each component is a byte range of the object. The byte ranges
1955 of the returned components are mutually exclusive and collectively
1956 exhaustive. The byte ranges are inclusive at both end points.
1957
1958 Args:
1959 src_url: Source CloudUrl.
1960 src_obj_metadata: Metadata from the source object.
1961 dst_url: Destination FileUrl.
1962 download_file_name: Temporary file name to be used for the download.
1963
1964 Returns:
1965 components_to_download: A list of PerformSlicedDownloadObjectToFileArgs
1966 to be used in Apply for the sliced download.
1967 """
1968 sliced_download_component_size = HumanReadableToBytes(
1969 config.get('GSUtil', 'sliced_object_download_component_size',
1970 DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE))
1971
1972 max_components = config.getint(
1973 'GSUtil', 'sliced_object_download_max_components',
1974 DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS)
1975
1976 num_components, component_size = _GetPartitionInfo(
1977 src_obj_metadata.size, max_components, sliced_download_component_size)
1978
1979 components_to_download = []
1980 component_lengths = []
1981 for i in range(num_components):
1982 start_byte = i * component_size
1983 end_byte = min((i + 1) * (component_size) - 1, src_obj_metadata.size - 1)
1984 component_lengths.append(end_byte - start_byte + 1)
1985 components_to_download.append(
1986 PerformSlicedDownloadObjectToFileArgs(
1987 i, src_url, src_obj_metadata, dst_url, download_file_name,
1988 start_byte, end_byte))
1989 return components_to_download, component_lengths
1990
1991
1992 def _DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name,
1993 command_obj, logger, copy_exception_handler,
1994 api_selector):
1995 """Downloads a cloud object to a local file using sliced download.
1996
1997 Byte ranges are decided for each thread/process, and then the parts are
1998 downloaded in parallel.
1999
2000 Args:
2001 src_url: Source CloudUrl.
2002 src_obj_metadata: Metadata from the source object.
2003 dst_url: Destination FileUrl.
2004 download_file_name: Temporary file name to be used for download.
2005 command_obj: command object for use in Apply in parallel composite uploads.
2006 logger: for outputting log messages.
2007 copy_exception_handler: For handling copy exceptions during Apply.
2008 api_selector: The Cloud API implementation used.
2009
2010 Returns:
2011 (bytes_transferred, crc32c)
2012 bytes_transferred: Number of bytes transferred from server this call.
2013 crc32c: a crc32c hash value (integer) for the downloaded bytes, or None if
2014 crc32c hashing wasn't performed.
2015 """
2016 components_to_download, component_lengths = _PartitionObject(
2017 src_url, src_obj_metadata, dst_url, download_file_name)
2018
2019 num_components = len(components_to_download)
2020 _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url,
2021 download_file_name, logger,
2022 api_selector, num_components)
2023
2024 # Resize the download file so each child process can seek to its start byte.
2025 with open(download_file_name, 'ab') as fp:
2026 fp.truncate(src_obj_metadata.size)
2027
2028 cp_results = command_obj.Apply(
2029 _PerformSlicedDownloadObjectToFile, components_to_download,
2030 copy_exception_handler, arg_checker=gslib.command.DummyArgChecker,
2031 parallel_operations_override=True, should_return_results=True)
2032
2033 if len(cp_results) < num_components:
2034 raise CommandException(
2035 'Some components of %s were not downloaded successfully. '
2036 'Please retry this download.' % dst_url.object_name)
2037
2038 # Crc32c hashes have to be concatenated in the correct order.
2039 cp_results = sorted(cp_results, key=attrgetter('component_num'))
2040 crc32c = cp_results[0].crc32c
2041 if crc32c is not None:
2042 for i in range(1, num_components):
2043 crc32c = ConcatCrc32c(crc32c, cp_results[i].crc32c,
2044 component_lengths[i])
2045
2046 bytes_transferred = 0
2047 expect_gzip = (src_obj_metadata.contentEncoding and
2048 src_obj_metadata.contentEncoding.lower().endswith('gzip'))
2049 for cp_result in cp_results:
2050 bytes_transferred += cp_result.bytes_transferred
2051 server_gzip = (cp_result.server_encoding and
2052 cp_result.server_encoding.lower().endswith('gzip'))
2053 # If the server gzipped any components on the fly, we will have no chance of
2054 # properly reconstructing the file.
2055 if server_gzip and not expect_gzip:
2056 raise CommandException(
2057 'Download of %s failed because the server sent back data with an '
2058 'unexpected encoding.' % dst_url.object_name)
2059
2060 return bytes_transferred, crc32c
2061
2062
2063 def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
2064 download_file_name, gsutil_api, logger,
2065 digesters, component_num=None, start_byte=0,
2066 end_byte=None):
2067 """Downloads an object to a local file using the resumable strategy.
2068
2069 Args:
2070 src_url: Source CloudUrl.
2071 src_obj_metadata: Metadata from the source object.
2072 dst_url: Destination FileUrl.
2073 download_file_name: Temporary file name to be used for download.
2074 gsutil_api: gsutil Cloud API instance to use for the download.
2075 logger: for outputting log messages.
2076 digesters: Digesters corresponding to the hash algorithms that will be used
2077 for validation.
2078 component_num: Which component of a sliced download this call is for, or
2079 None if this is not a sliced download.
2080 start_byte: The first byte of a byte range for a sliced download.
2081 end_byte: The last byte of a byte range for a sliced download.
2082
2083 Returns:
2084 (bytes_transferred, server_encoding)
2085 bytes_transferred: Number of bytes transferred from server this call.
2086 server_encoding: Content-encoding string if it was detected that the server
2087 sent encoded bytes during transfer, None otherwise.
2088 """
2089 if end_byte is None:
2090 end_byte = src_obj_metadata.size - 1
2091 download_size = end_byte - start_byte + 1
2092
2093 is_sliced = component_num is not None
2094 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
1728 server_encoding = None 2095 server_encoding = None
1729 download_complete = False 2096
1730 download_strategy = _SelectDownloadStrategy(dst_url) 2097 # Used for logging
1731 download_start_point = 0 2098 download_name = dst_url.object_name
1732 # This is used for resuming downloads, but also for passing the mediaLink 2099 if is_sliced:
1733 # and size into the download for new downloads so that we can avoid 2100 download_name += ' component %d' % component_num
1734 # making an extra HTTP call. 2101
1735 serialization_data = None
1736 serialization_dict = GetDownloadSerializationDict(src_obj_metadata)
1737 open_files = []
1738 try: 2102 try:
1739 if download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: 2103 fp = open(download_file_name, 'r+b')
1740 fp = open(download_file_name, 'wb') 2104 fp.seek(start_byte)
1741 elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE: 2105 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
1742 # If this is a resumable download, we need to open the file for append and 2106 existing_file_size = GetFileSize(fp)
1743 # manage a tracker file. 2107
1744 if open_files_map.get(download_file_name, False): 2108 tracker_file_name, download_start_byte = (
1745 # Ensure another process/thread is not already writing to this file. 2109 ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger,
1746 raise FileConcurrencySkipError 2110 api_selector, start_byte,
1747 open_files.append(download_file_name) 2111 existing_file_size, component_num))
1748 open_files_map[download_file_name] = True 2112
1749 fp = open(download_file_name, 'ab') 2113 if download_start_byte < start_byte or download_start_byte > end_byte + 1:
1750 2114 DeleteTrackerFile(tracker_file_name)
1751 resuming = ReadOrCreateDownloadTrackerFile( 2115 raise CommandException(
1752 src_obj_metadata, dst_url, api_selector) 2116 'Resumable download start point for %s is not in the correct byte '
1753 if resuming: 2117 'range. Deleting tracker file, so if you re-try this download it '
1754 # Find out how far along we are so we can request the appropriate 2118 'will start from scratch' % download_name)
1755 # remaining range of the object. 2119
1756 existing_file_size = GetFileSize(fp, position_to_eof=True) 2120 download_complete = (download_start_byte == start_byte + download_size)
1757 if existing_file_size > src_obj_metadata.size: 2121 resuming = (download_start_byte != start_byte) and not download_complete
1758 DeleteTrackerFile(GetTrackerFilePath( 2122 if resuming:
1759 dst_url, TrackerFileType.DOWNLOAD, api_selector)) 2123 logger.info('Resuming download for %s', download_name)
1760 raise CommandException( 2124 elif download_complete:
1761 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' 2125 logger.info(
1762 'if you re-try this download it will start from scratch' % 2126 'Download already complete for %s, skipping download but '
1763 (download_file_name, existing_file_size, src_url.object_name, 2127 'will run integrity checks.', download_name)
1764 src_obj_metadata.size)) 2128
1765 else: 2129 # This is used for resuming downloads, but also for passing the mediaLink
1766 if existing_file_size == src_obj_metadata.size: 2130 # and size into the download for new downloads so that we can avoid
1767 logger.info( 2131 # making an extra HTTP call.
1768 'Download already complete for file %s, skipping download but ' 2132 serialization_data = GetDownloadSerializationData(
1769 'will run integrity checks.', download_file_name) 2133 src_obj_metadata, progress=download_start_byte)
1770 download_complete = True 2134
1771 else: 2135 if resuming or download_complete:
1772 download_start_point = existing_file_size 2136 # Catch up our digester with the hash data.
1773 serialization_dict['progress'] = download_start_point 2137 bytes_digested = 0
1774 logger.info('Resuming download for %s', src_url.url_string) 2138 total_bytes_to_digest = download_start_byte - start_byte
1775 # Catch up our digester with the hash data. 2139 hash_callback = ProgressCallbackWithBackoff(
1776 if existing_file_size > TEN_MIB: 2140 total_bytes_to_digest,
1777 for alg_name in digesters: 2141 FileProgressCallbackHandler(
1778 logger.info( 2142 ConstructAnnounceText('Hashing',
1779 'Catching up %s for %s', alg_name, download_file_name) 2143 dst_url.url_string), logger).call)
1780 with open(download_file_name, 'rb') as hash_fp: 2144
1781 while True: 2145 while bytes_digested < total_bytes_to_digest:
1782 data = hash_fp.read(DEFAULT_FILE_BUFFER_SIZE) 2146 bytes_to_read = min(DEFAULT_FILE_BUFFER_SIZE,
1783 if not data: 2147 total_bytes_to_digest - bytes_digested)
1784 break 2148 data = fp.read(bytes_to_read)
1785 for alg_name in digesters: 2149 bytes_digested += bytes_to_read
1786 digesters[alg_name].update(data) 2150 for alg_name in digesters:
1787 else: 2151 digesters[alg_name].update(data)
1788 # Starting a new download, blow away whatever is already there. 2152 hash_callback.Progress(len(data))
1789 fp.truncate(0) 2153
1790 fp.seek(0) 2154 elif not is_sliced:
1791 2155 # Delete file contents and start entire object download from scratch.
1792 else: 2156 fp.truncate(0)
1793 raise CommandException('Invalid download strategy %s chosen for' 2157 existing_file_size = 0
1794 'file %s' % (download_strategy, fp.name))
1795
1796 if not dst_url.IsStream():
1797 serialization_data = json.dumps(serialization_dict)
1798 2158
1799 progress_callback = FileProgressCallbackHandler( 2159 progress_callback = FileProgressCallbackHandler(
1800 ConstructAnnounceText('Downloading', dst_url.url_string), 2160 ConstructAnnounceText('Downloading', dst_url.url_string), logger,
1801 logger).call 2161 start_byte, download_size).call
2162
1802 if global_copy_helper_opts.test_callback_file: 2163 if global_copy_helper_opts.test_callback_file:
1803 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: 2164 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
1804 progress_callback = pickle.loads(test_fp.read()).call 2165 progress_callback = pickle.loads(test_fp.read()).call
1805 2166
1806 start_time = time.time() 2167 if is_sliced and src_obj_metadata.size >= ResumableThreshold():
2168 fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata,
2169 start_byte, end_byte)
2170
1807 # TODO: With gzip encoding (which may occur on-the-fly and not be part of 2171 # TODO: With gzip encoding (which may occur on-the-fly and not be part of
1808 # the object's metadata), when we request a range to resume, it's possible 2172 # the object's metadata), when we request a range to resume, it's possible
1809 # that the server will just resend the entire object, which means our 2173 # that the server will just resend the entire object, which means our
1810 # caught-up hash will be incorrect. We recalculate the hash on 2174 # caught-up hash will be incorrect. We recalculate the hash on
1811 # the local file in the case of a failed gzip hash anyway, but it would 2175 # the local file in the case of a failed gzip hash anyway, but it would
1812 # be better if we actively detected this case. 2176 # be better if we actively detected this case.
1813 if not download_complete: 2177 if not download_complete:
2178 fp.seek(download_start_byte)
1814 server_encoding = gsutil_api.GetObjectMedia( 2179 server_encoding = gsutil_api.GetObjectMedia(
1815 src_url.bucket_name, src_url.object_name, fp, 2180 src_url.bucket_name, src_url.object_name, fp,
1816 start_byte=download_start_point, generation=src_url.generation, 2181 start_byte=download_start_byte, end_byte=end_byte,
1817 object_size=src_obj_metadata.size, 2182 generation=src_url.generation, object_size=src_obj_metadata.size,
1818 download_strategy=download_strategy, provider=src_url.scheme, 2183 download_strategy=CloudApi.DownloadStrategy.RESUMABLE,
1819 serialization_data=serialization_data, digesters=digesters, 2184 provider=src_url.scheme, serialization_data=serialization_data,
1820 progress_callback=progress_callback) 2185 digesters=digesters, progress_callback=progress_callback)
1821
1822 end_time = time.time()
1823
1824 # If a custom test method is defined, call it here. For the copy command,
1825 # test methods are expected to take one argument: an open file pointer,
1826 # and are used to perturb the open file during download to exercise
1827 # download error detection.
1828 if test_method:
1829 test_method(fp)
1830 2186
1831 except ResumableDownloadException as e: 2187 except ResumableDownloadException as e:
1832 logger.warning('Caught ResumableDownloadException (%s) for file %s.', 2188 logger.warning('Caught ResumableDownloadException (%s) for download of %s.',
1833 e.reason, file_name) 2189 e.reason, download_name)
1834 raise 2190 raise
1835
1836 finally: 2191 finally:
1837 if fp: 2192 if fp:
1838 fp.close() 2193 fp.close()
1839 for file_name in open_files:
1840 open_files_map.delete(file_name)
1841 2194
1842 # If we decompressed a content-encoding gzip file on the fly, this may not 2195 bytes_transferred = end_byte - download_start_byte + 1
1843 # be accurate, but it is the best we can do without going deep into the 2196 return bytes_transferred, server_encoding
1844 # underlying HTTP libraries. Note that this value is only used for 2197
1845 # reporting in log messages; inaccuracy doesn't impact the integrity of the 2198
1846 # download. 2199 def _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url,
1847 bytes_transferred = src_obj_metadata.size - download_start_point 2200 download_file_name, gsutil_api, logger,
2201 digesters):
2202 """Downloads an object to a local file using the non-resumable strategy.
2203
2204 Args:
2205 src_url: Source CloudUrl.
2206 src_obj_metadata: Metadata from the source object.
2207 dst_url: Destination FileUrl.
2208 download_file_name: Temporary file name to be used for download.
2209 gsutil_api: gsutil Cloud API instance to use for the download.
2210 logger: for outputting log messages.
2211 digesters: Digesters corresponding to the hash algorithms that will be used
2212 for validation.
2213 Returns:
2214 (bytes_transferred, server_encoding)
2215 bytes_transferred: Number of bytes transferred from server this call.
2216 server_encoding: Content-encoding string if it was detected that the server
2217 sent encoded bytes during transfer, None otherwise.
2218 """
2219 try:
2220 fp = open(download_file_name, 'w')
2221
2222 # This is used to pass the mediaLink and the size into the download so that
2223 # we can avoid making an extra HTTP call.
2224 serialization_data = GetDownloadSerializationData(src_obj_metadata)
2225
2226 progress_callback = FileProgressCallbackHandler(
2227 ConstructAnnounceText('Downloading', dst_url.url_string), logger).call
2228
2229 if global_copy_helper_opts.test_callback_file:
2230 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
2231 progress_callback = pickle.loads(test_fp.read()).call
2232
2233 server_encoding = gsutil_api.GetObjectMedia(
2234 src_url.bucket_name, src_url.object_name, fp,
2235 generation=src_url.generation, object_size=src_obj_metadata.size,
2236 download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
2237 provider=src_url.scheme, serialization_data=serialization_data,
2238 digesters=digesters, progress_callback=progress_callback)
2239 finally:
2240 if fp:
2241 fp.close()
2242
2243 return src_obj_metadata.size, server_encoding
2244
2245
2246 def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
2247 gsutil_api, logger, command_obj,
2248 copy_exception_handler, allow_splitting=True):
2249 """Downloads an object to a local file.
2250
2251 Args:
2252 src_url: Source CloudUrl.
2253 src_obj_metadata: Metadata from the source object.
2254 dst_url: Destination FileUrl.
2255 gsutil_api: gsutil Cloud API instance to use for the download.
2256 logger: for outputting log messages.
2257 command_obj: command object for use in Apply in sliced downloads.
2258 copy_exception_handler: For handling copy exceptions during Apply.
2259 allow_splitting: Whether or not to allow sliced download.
2260 Returns:
2261 (elapsed_time, bytes_transferred, dst_url, md5), where time elapsed
2262 excludes initial GET.
2263
2264 Raises:
2265 FileConcurrencySkipError: if this download is already in progress.
2266 CommandException: if other errors encountered.
2267 """
2268 global open_files_map, open_files_lock
2269 if dst_url.object_name.endswith(dst_url.delim):
2270 logger.warn('\n'.join(textwrap.wrap(
2271 'Skipping attempt to download to filename ending with slash (%s). This '
2272 'typically happens when using gsutil to download from a subdirectory '
2273 'created by the Cloud Console (https://cloud.google.com/console)'
2274 % dst_url.object_name)))
2275 return (0, 0, dst_url, '')
2276
2277 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
2278 download_strategy = _SelectDownloadStrategy(dst_url)
2279 sliced_download = _ShouldDoSlicedDownload(
2280 download_strategy, src_obj_metadata, allow_splitting, logger)
2281
2282 download_file_name, need_to_unzip = _GetDownloadFile(
2283 dst_url, src_obj_metadata, logger)
2284
2285 # Ensure another process/thread is not already writing to this file.
2286 with open_files_lock:
2287 if open_files_map.get(download_file_name, False):
2288 raise FileConcurrencySkipError
2289 open_files_map[download_file_name] = True
2290
2291 # Set up hash digesters.
2292 consider_md5 = src_obj_metadata.md5Hash and not sliced_download
2293 hash_algs = GetDownloadHashAlgs(logger, consider_md5=consider_md5,
2294 consider_crc32c=src_obj_metadata.crc32c)
2295 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
2296
2297 # Tracks whether the server used a gzip encoding.
2298 server_encoding = None
2299 download_complete = (src_obj_metadata.size == 0)
2300 bytes_transferred = 0
2301
2302 start_time = time.time()
2303 if not download_complete:
2304 if sliced_download:
2305 (bytes_transferred, crc32c) = (
2306 _DoSlicedDownload(src_url, src_obj_metadata, dst_url,
2307 download_file_name, command_obj, logger,
2308 copy_exception_handler, api_selector))
2309 if 'crc32c' in digesters:
2310 digesters['crc32c'].crcValue = crc32c
2311 elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
2312 (bytes_transferred, server_encoding) = (
2313 _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url,
2314 download_file_name, gsutil_api,
2315 logger, digesters))
2316 elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
2317 (bytes_transferred, server_encoding) = (
2318 _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
2319 download_file_name, gsutil_api, logger,
2320 digesters))
2321 else:
2322 raise CommandException('Invalid download strategy %s chosen for'
2323 'file %s' % (download_strategy,
2324 download_file_name))
2325 end_time = time.time()
2326
1848 server_gzip = server_encoding and server_encoding.lower().endswith('gzip') 2327 server_gzip = server_encoding and server_encoding.lower().endswith('gzip')
1849 local_md5 = _ValidateDownloadHashes(logger, src_url, src_obj_metadata, 2328 local_md5 = _ValidateAndCompleteDownload(
1850 dst_url, need_to_unzip, server_gzip, 2329 logger, src_url, src_obj_metadata, dst_url, need_to_unzip, server_gzip,
1851 digesters, hash_algs, api_selector, 2330 digesters, hash_algs, download_file_name, api_selector, bytes_transferred)
1852 bytes_transferred) 2331
2332 with open_files_lock:
2333 open_files_map.delete(download_file_name)
1853 2334
1854 return (end_time - start_time, bytes_transferred, dst_url, local_md5) 2335 return (end_time - start_time, bytes_transferred, dst_url, local_md5)
1855 2336
1856 2337
1857 def _GetDownloadZipFileName(file_name): 2338 def _GetDownloadTempZipFileName(dst_url):
1858 """Returns the file name for a temporarily compressed downloaded file.""" 2339 """Returns temporary file name for a temporarily compressed download."""
1859 return '%s_.gztmp' % file_name 2340 return '%s_.gztmp' % dst_url.object_name
1860 2341
1861 2342
1862 def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, 2343 def _GetDownloadTempFileName(dst_url):
1863 need_to_unzip, server_gzip, digesters, hash_algs, 2344 """Returns temporary download file name for uncompressed downloads."""
1864 api_selector, bytes_transferred): 2345 return '%s_.gstmp' % dst_url.object_name
1865 """Validates a downloaded file's integrity. 2346
2347
2348 def _ValidateAndCompleteDownload(logger, src_url, src_obj_metadata, dst_url,
2349 need_to_unzip, server_gzip, digesters,
2350 hash_algs, download_file_name,
2351 api_selector, bytes_transferred):
2352 """Validates and performs necessary operations on a downloaded file.
2353
2354 Validates the integrity of the downloaded file using hash_algs. If the file
2355 was compressed (temporarily), the file will be decompressed. Then, if the
2356 integrity of the file was successfully validated, the file will be moved
2357 from its temporary download location to its permanent location on disk.
1866 2358
1867 Args: 2359 Args:
1868 logger: For outputting log messages. 2360 logger: For outputting log messages.
1869 src_url: StorageUrl for the source object. 2361 src_url: StorageUrl for the source object.
1870 src_obj_metadata: Metadata for the source object, potentially containing 2362 src_obj_metadata: Metadata for the source object, potentially containing
1871 hash values. 2363 hash values.
1872 dst_url: StorageUrl describing the destination file. 2364 dst_url: StorageUrl describing the destination file.
1873 need_to_unzip: If true, a temporary zip file was used and must be 2365 need_to_unzip: If true, a temporary zip file was used and must be
1874 uncompressed as part of validation. 2366 uncompressed as part of validation.
1875 server_gzip: If true, the server gzipped the bytes (regardless of whether 2367 server_gzip: If true, the server gzipped the bytes (regardless of whether
1876 the object metadata claimed it was gzipped). 2368 the object metadata claimed it was gzipped).
1877 digesters: dict of {string, hash digester} that contains up-to-date digests 2369 digesters: dict of {string, hash digester} that contains up-to-date digests
1878 computed during the download. If a digester for a particular 2370 computed during the download. If a digester for a particular
1879 algorithm is None, an up-to-date digest is not available and the 2371 algorithm is None, an up-to-date digest is not available and the
1880 hash must be recomputed from the local file. 2372 hash must be recomputed from the local file.
1881 hash_algs: dict of {string, hash algorithm} that can be used if digesters 2373 hash_algs: dict of {string, hash algorithm} that can be used if digesters
1882 don't have up-to-date digests. 2374 don't have up-to-date digests.
2375 download_file_name: Temporary file name that was used for download.
1883 api_selector: The Cloud API implementation used (used tracker file naming). 2376 api_selector: The Cloud API implementation used (used tracker file naming).
1884 bytes_transferred: Number of bytes downloaded (used for logging). 2377 bytes_transferred: Number of bytes downloaded (used for logging).
1885 2378
1886 Returns: 2379 Returns:
1887 An MD5 of the local file, if one was calculated as part of the integrity 2380 An MD5 of the local file, if one was calculated as part of the integrity
1888 check. 2381 check.
1889 """ 2382 """
1890 file_name = dst_url.object_name 2383 final_file_name = dst_url.object_name
1891 download_file_name = (_GetDownloadZipFileName(file_name) if need_to_unzip else 2384 file_name = download_file_name
1892 file_name)
1893 digesters_succeeded = True 2385 digesters_succeeded = True
2386
1894 for alg in digesters: 2387 for alg in digesters:
1895 # If we get a digester with a None algorithm, the underlying 2388 # If we get a digester with a None algorithm, the underlying
1896 # implementation failed to calculate a digest, so we will need to 2389 # implementation failed to calculate a digest, so we will need to
1897 # calculate one from scratch. 2390 # calculate one from scratch.
1898 if not digesters[alg]: 2391 if not digesters[alg]:
1899 digesters_succeeded = False 2392 digesters_succeeded = False
1900 break 2393 break
1901 2394
1902 if digesters_succeeded: 2395 if digesters_succeeded:
1903 local_hashes = _CreateDigestsFromDigesters(digesters) 2396 local_hashes = _CreateDigestsFromDigesters(digesters)
1904 else: 2397 else:
1905 local_hashes = _CreateDigestsFromLocalFile( 2398 local_hashes = _CreateDigestsFromLocalFile(
1906 logger, hash_algs, download_file_name, src_obj_metadata) 2399 logger, hash_algs, file_name, final_file_name, src_obj_metadata)
1907 2400
1908 digest_verified = True 2401 digest_verified = True
1909 hash_invalid_exception = None 2402 hash_invalid_exception = None
1910 try: 2403 try:
1911 _CheckHashes(logger, src_url, src_obj_metadata, download_file_name, 2404 _CheckHashes(logger, src_url, src_obj_metadata, final_file_name,
1912 local_hashes) 2405 local_hashes)
1913 DeleteTrackerFile(GetTrackerFilePath( 2406 DeleteDownloadTrackerFiles(dst_url, api_selector)
1914 dst_url, TrackerFileType.DOWNLOAD, api_selector))
1915 except HashMismatchException, e: 2407 except HashMismatchException, e:
1916 # If an non-gzipped object gets sent with gzip content encoding, the hash 2408 # If an non-gzipped object gets sent with gzip content encoding, the hash
1917 # we calculate will match the gzipped bytes, not the original object. Thus, 2409 # we calculate will match the gzipped bytes, not the original object. Thus,
1918 # we'll need to calculate and check it after unzipping. 2410 # we'll need to calculate and check it after unzipping.
1919 if server_gzip: 2411 if server_gzip:
1920 logger.debug( 2412 logger.debug(
1921 'Hash did not match but server gzipped the content, will ' 2413 'Hash did not match but server gzipped the content, will '
1922 'recalculate.') 2414 'recalculate.')
1923 digest_verified = False 2415 digest_verified = False
1924 elif api_selector == ApiSelector.XML: 2416 elif api_selector == ApiSelector.XML:
1925 logger.debug( 2417 logger.debug(
1926 'Hash did not match but server may have gzipped the content, will ' 2418 'Hash did not match but server may have gzipped the content, will '
1927 'recalculate.') 2419 'recalculate.')
1928 # Save off the exception in case this isn't a gzipped file. 2420 # Save off the exception in case this isn't a gzipped file.
1929 hash_invalid_exception = e 2421 hash_invalid_exception = e
1930 digest_verified = False 2422 digest_verified = False
1931 else: 2423 else:
1932 DeleteTrackerFile(GetTrackerFilePath( 2424 DeleteDownloadTrackerFiles(dst_url, api_selector)
1933 dst_url, TrackerFileType.DOWNLOAD, api_selector))
1934 if _RENAME_ON_HASH_MISMATCH: 2425 if _RENAME_ON_HASH_MISMATCH:
1935 os.rename(download_file_name, 2426 os.rename(file_name,
1936 download_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) 2427 final_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
1937 else: 2428 else:
1938 os.unlink(download_file_name) 2429 os.unlink(file_name)
1939 raise 2430 raise
1940 2431
1941 if server_gzip and not need_to_unzip:
1942 # Server compressed bytes on-the-fly, thus we need to rename and decompress.
1943 # We can't decompress on-the-fly because prior to Python 3.2 the gzip
1944 # module makes a bunch of seek calls on the stream.
1945 download_file_name = _GetDownloadZipFileName(file_name)
1946 os.rename(file_name, download_file_name)
1947
1948 if need_to_unzip or server_gzip: 2432 if need_to_unzip or server_gzip:
1949 # Log that we're uncompressing if the file is big enough that 2433 # Log that we're uncompressing if the file is big enough that
1950 # decompressing would make it look like the transfer "stalled" at the end. 2434 # decompressing would make it look like the transfer "stalled" at the end.
1951 if bytes_transferred > TEN_MIB: 2435 if bytes_transferred > TEN_MIB:
1952 logger.info( 2436 logger.info(
1953 'Uncompressing downloaded tmp file to %s...', file_name) 2437 'Uncompressing temporarily gzipped file to %s...', final_file_name)
1954 2438
1955 # Downloaded gzipped file to a filename w/o .gz extension, so unzip.
1956 gzip_fp = None 2439 gzip_fp = None
1957 try: 2440 try:
1958 gzip_fp = gzip.open(download_file_name, 'rb') 2441 # Downloaded temporarily gzipped file, unzip to file without '_.gztmp'
1959 with open(file_name, 'wb') as f_out: 2442 # suffix.
2443 gzip_fp = gzip.open(file_name, 'rb')
2444 with open(final_file_name, 'wb') as f_out:
1960 data = gzip_fp.read(GZIP_CHUNK_SIZE) 2445 data = gzip_fp.read(GZIP_CHUNK_SIZE)
1961 while data: 2446 while data:
1962 f_out.write(data) 2447 f_out.write(data)
1963 data = gzip_fp.read(GZIP_CHUNK_SIZE) 2448 data = gzip_fp.read(GZIP_CHUNK_SIZE)
1964 except IOError, e: 2449 except IOError, e:
1965 # In the XML case where we don't know if the file was gzipped, raise 2450 # In the XML case where we don't know if the file was gzipped, raise
1966 # the original hash exception if we find that it wasn't. 2451 # the original hash exception if we find that it wasn't.
1967 if 'Not a gzipped file' in str(e) and hash_invalid_exception: 2452 if 'Not a gzipped file' in str(e) and hash_invalid_exception:
1968 # Linter improperly thinks we're raising None despite the above check. 2453 # Linter improperly thinks we're raising None despite the above check.
1969 # pylint: disable=raising-bad-type 2454 # pylint: disable=raising-bad-type
1970 raise hash_invalid_exception 2455 raise hash_invalid_exception
1971 finally: 2456 finally:
1972 if gzip_fp: 2457 if gzip_fp:
1973 gzip_fp.close() 2458 gzip_fp.close()
1974 2459
1975 os.unlink(download_file_name) 2460 os.unlink(file_name)
2461 file_name = final_file_name
1976 2462
1977 if not digest_verified: 2463 if not digest_verified:
1978 try: 2464 try:
1979 # Recalculate hashes on the unzipped local file. 2465 # Recalculate hashes on the unzipped local file.
1980 local_hashes = _CreateDigestsFromLocalFile(logger, hash_algs, file_name, 2466 local_hashes = _CreateDigestsFromLocalFile(
1981 src_obj_metadata) 2467 logger, hash_algs, file_name, final_file_name, src_obj_metadata)
1982 _CheckHashes(logger, src_url, src_obj_metadata, file_name, local_hashes) 2468 _CheckHashes(logger, src_url, src_obj_metadata, final_file_name,
1983 DeleteTrackerFile(GetTrackerFilePath( 2469 local_hashes)
1984 dst_url, TrackerFileType.DOWNLOAD, api_selector)) 2470 DeleteDownloadTrackerFiles(dst_url, api_selector)
1985 except HashMismatchException: 2471 except HashMismatchException:
1986 DeleteTrackerFile(GetTrackerFilePath( 2472 DeleteDownloadTrackerFiles(dst_url, api_selector)
1987 dst_url, TrackerFileType.DOWNLOAD, api_selector))
1988 if _RENAME_ON_HASH_MISMATCH: 2473 if _RENAME_ON_HASH_MISMATCH:
1989 os.rename(file_name, 2474 os.rename(file_name,
1990 file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) 2475 file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
1991 else: 2476 else:
1992 os.unlink(file_name) 2477 os.unlink(file_name)
1993 raise 2478 raise
1994 2479
2480 if file_name != final_file_name:
2481 # Data is still in a temporary file, so move it to a permanent location.
2482 if os.path.exists(final_file_name):
2483 os.unlink(final_file_name)
2484 os.rename(file_name,
2485 final_file_name)
2486
1995 if 'md5' in local_hashes: 2487 if 'md5' in local_hashes:
1996 return local_hashes['md5'] 2488 return local_hashes['md5']
1997 2489
1998 2490
1999 def _CopyFileToFile(src_url, dst_url): 2491 def _CopyFileToFile(src_url, dst_url):
2000 """Copies a local file to a local file. 2492 """Copies a local file to a local file.
2001 2493
2002 Args: 2494 Args:
2003 src_url: Source FileUrl. 2495 src_url: Source FileUrl.
2004 dst_url: Destination FileUrl. 2496 dst_url: Destination FileUrl.
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
2116 result_url, uploaded_object.generation) 2608 result_url, uploaded_object.generation)
2117 2609
2118 return (end_time - start_time, src_obj_metadata.size, result_url, 2610 return (end_time - start_time, src_obj_metadata.size, result_url,
2119 uploaded_object.md5Hash) 2611 uploaded_object.md5Hash)
2120 2612
2121 2613
2122 # pylint: disable=undefined-variable 2614 # pylint: disable=undefined-variable
2123 # pylint: disable=too-many-statements 2615 # pylint: disable=too-many-statements
2124 def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, 2616 def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
2125 copy_exception_handler, allow_splitting=True, 2617 copy_exception_handler, allow_splitting=True,
2126 headers=None, manifest=None, gzip_exts=None, test_method=None): 2618 headers=None, manifest=None, gzip_exts=None):
2127 """Performs copy from src_url to dst_url, handling various special cases. 2619 """Performs copy from src_url to dst_url, handling various special cases.
2128 2620
2129 Args: 2621 Args:
2130 logger: for outputting log messages. 2622 logger: for outputting log messages.
2131 src_url: Source StorageUrl. 2623 src_url: Source StorageUrl.
2132 dst_url: Destination StorageUrl. 2624 dst_url: Destination StorageUrl.
2133 gsutil_api: gsutil Cloud API instance to use for the copy. 2625 gsutil_api: gsutil Cloud API instance to use for the copy.
2134 command_obj: command object for use in Apply in parallel composite uploads. 2626 command_obj: command object for use in Apply in parallel composite uploads
2627 and sliced object downloads.
2135 copy_exception_handler: for handling copy exceptions during Apply. 2628 copy_exception_handler: for handling copy exceptions during Apply.
2136 allow_splitting: Whether to allow the file to be split into component 2629 allow_splitting: Whether to allow the file to be split into component
2137 pieces for an parallel composite upload. 2630 pieces for an parallel composite upload or download.
2138 headers: optional headers to use for the copy operation. 2631 headers: optional headers to use for the copy operation.
2139 manifest: optional manifest for tracking copy operations. 2632 manifest: optional manifest for tracking copy operations.
2140 gzip_exts: List of file extensions to gzip for uploads, if any. 2633 gzip_exts: List of file extensions to gzip for uploads, if any.
2141 test_method: optional test method for modifying files during unit tests.
2142 2634
2143 Returns: 2635 Returns:
2144 (elapsed_time, bytes_transferred, version-specific dst_url) excluding 2636 (elapsed_time, bytes_transferred, version-specific dst_url) excluding
2145 overhead like initial GET. 2637 overhead like initial GET.
2146 2638
2147 Raises: 2639 Raises:
2148 ItemExistsError: if no clobber flag is specified and the destination 2640 ItemExistsError: if no clobber flag is specified and the destination
2149 object already exists. 2641 object already exists.
2150 SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified 2642 SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified
2151 and the source is an unsupported type. 2643 and the source is an unsupported type.
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
2187 if global_copy_helper_opts.preserve_acl: 2679 if global_copy_helper_opts.preserve_acl:
2188 src_obj_fields.append('acl') 2680 src_obj_fields.append('acl')
2189 if (src_url.scheme == dst_url.scheme 2681 if (src_url.scheme == dst_url.scheme
2190 and not global_copy_helper_opts.daisy_chain): 2682 and not global_copy_helper_opts.daisy_chain):
2191 copy_in_the_cloud = True 2683 copy_in_the_cloud = True
2192 else: 2684 else:
2193 copy_in_the_cloud = False 2685 copy_in_the_cloud = False
2194 else: 2686 else:
2195 # Just get the fields needed to validate the download. 2687 # Just get the fields needed to validate the download.
2196 src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag', 2688 src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag',
2197 'mediaLink', 'md5Hash', 'size'] 2689 'mediaLink', 'md5Hash', 'size', 'generation']
2198 2690
2199 if (src_url.scheme == 's3' and 2691 if (src_url.scheme == 's3' and
2200 global_copy_helper_opts.skip_unsupported_objects): 2692 global_copy_helper_opts.skip_unsupported_objects):
2201 src_obj_fields.append('storageClass') 2693 src_obj_fields.append('storageClass')
2202 2694
2203 try: 2695 try:
2204 src_generation = GenerationFromUrlAndString(src_url, src_url.generation) 2696 src_generation = GenerationFromUrlAndString(src_url, src_url.generation)
2205 src_obj_metadata = gsutil_api.GetObjectMetadata( 2697 src_obj_metadata = gsutil_api.GetObjectMetadata(
2206 src_url.bucket_name, src_url.object_name, 2698 src_url.bucket_name, src_url.object_name,
2207 generation=src_generation, provider=src_url.scheme, 2699 generation=src_generation, provider=src_url.scheme,
(...skipping 16 matching lines...) Expand all
2224 # dst_url will be verified in _CopyObjToObjDaisyChainMode if it 2716 # dst_url will be verified in _CopyObjToObjDaisyChainMode if it
2225 # is not s3 (and thus differs from src_url). 2717 # is not s3 (and thus differs from src_url).
2226 if src_url.scheme == 's3': 2718 if src_url.scheme == 's3':
2227 acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata) 2719 acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata)
2228 if acl_text: 2720 if acl_text:
2229 AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text) 2721 AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text)
2230 else: 2722 else:
2231 try: 2723 try:
2232 src_obj_filestream = GetStreamFromFileUrl(src_url) 2724 src_obj_filestream = GetStreamFromFileUrl(src_url)
2233 except Exception, e: # pylint: disable=broad-except 2725 except Exception, e: # pylint: disable=broad-except
2234 raise CommandException('Error opening file "%s": %s.' % (src_url, 2726 if command_obj.continue_on_error:
2235 e.message)) 2727 message = 'Error copying %s: %s' % (src_url, str(e))
2728 command_obj.op_failure_count += 1
2729 logger.error(message)
2730 return
2731 else:
2732 raise CommandException('Error opening file "%s": %s.' % (src_url,
2733 e.message))
2236 if src_url.IsStream(): 2734 if src_url.IsStream():
2237 src_obj_size = None 2735 src_obj_size = None
2238 else: 2736 else:
2239 src_obj_size = os.path.getsize(src_url.object_name) 2737 src_obj_size = os.path.getsize(src_url.object_name)
2240 2738
2241 if global_copy_helper_opts.use_manifest: 2739 if global_copy_helper_opts.use_manifest:
2242 # Set the source size in the manifest. 2740 # Set the source size in the manifest.
2243 manifest.Set(src_url.url_string, 'size', src_obj_size) 2741 manifest.Set(src_url.url_string, 'size', src_obj_size)
2244 2742
2245 if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE 2743 if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
2301 _SetContentTypeFromFile(src_url, dst_obj_metadata) 2799 _SetContentTypeFromFile(src_url, dst_obj_metadata)
2302 else: 2800 else:
2303 # Files don't have Cloud API metadata. 2801 # Files don't have Cloud API metadata.
2304 dst_obj_metadata = None 2802 dst_obj_metadata = None
2305 2803
2306 _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata) 2804 _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata)
2307 2805
2308 if src_url.IsCloudUrl(): 2806 if src_url.IsCloudUrl():
2309 if dst_url.IsFileUrl(): 2807 if dst_url.IsFileUrl():
2310 return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, 2808 return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
2311 gsutil_api, logger, test_method=test_method) 2809 gsutil_api, logger, command_obj,
2810 copy_exception_handler,
2811 allow_splitting=allow_splitting)
2312 elif copy_in_the_cloud: 2812 elif copy_in_the_cloud:
2313 return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, 2813 return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
2314 dst_obj_metadata, preconditions, 2814 dst_obj_metadata, preconditions,
2315 gsutil_api, logger) 2815 gsutil_api, logger)
2316 else: 2816 else:
2317 return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, 2817 return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata,
2318 dst_url, dst_obj_metadata, 2818 dst_url, dst_obj_metadata,
2319 preconditions, gsutil_api, logger) 2819 preconditions, gsutil_api, logger)
2320 else: # src_url.IsFileUrl() 2820 else: # src_url.IsFileUrl()
2321 if dst_url.IsCloudUrl(): 2821 if dst_url.IsCloudUrl():
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
2492 if past_scheme.find(sep) == -1: 2992 if past_scheme.find(sep) == -1:
2493 return 'file://' 2993 return 'file://'
2494 else: 2994 else:
2495 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] 2995 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0]
2496 if url.IsBucket(): 2996 if url.IsBucket():
2497 return '%s://' % url.scheme 2997 return '%s://' % url.scheme
2498 # Else it names a bucket subdir. 2998 # Else it names a bucket subdir.
2499 return url.url_string.rstrip(sep).rpartition(sep)[0] 2999 return url.url_string.rstrip(sep).rpartition(sep)[0]
2500 3000
2501 3001
2502 def _DivideAndCeil(dividend, divisor):
2503 """Returns ceil(dividend / divisor).
2504
2505 Takes care to avoid the pitfalls of floating point arithmetic that could
2506 otherwise yield the wrong result for large numbers.
2507
2508 Args:
2509 dividend: Dividend for the operation.
2510 divisor: Divisor for the operation.
2511
2512 Returns:
2513 Quotient.
2514 """
2515 quotient = dividend // divisor
2516 if (dividend % divisor) != 0:
2517 quotient += 1
2518 return quotient
2519
2520
2521 def _GetPartitionInfo(file_size, max_components, default_component_size): 3002 def _GetPartitionInfo(file_size, max_components, default_component_size):
2522 """Gets info about a file partition for parallel composite uploads. 3003 """Gets info about a file partition for parallel file/object transfers.
2523 3004
2524 Args: 3005 Args:
2525 file_size: The number of bytes in the file to be partitioned. 3006 file_size: The number of bytes in the file to be partitioned.
2526 max_components: The maximum number of components that can be composed. 3007 max_components: The maximum number of components that can be composed.
2527 default_component_size: The size of a component, assuming that 3008 default_component_size: The size of a component, assuming that
2528 max_components is infinite. 3009 max_components is infinite.
2529 Returns: 3010 Returns:
2530 The number of components in the partitioned file, and the size of each 3011 The number of components in the partitioned file, and the size of each
2531 component (except the last, which will have a different size iff 3012 component (except the last, which will have a different size iff
2532 file_size != 0 (mod num_components)). 3013 file_size != 0 (mod num_components)).
2533 """ 3014 """
2534 # num_components = ceil(file_size / default_component_size) 3015 # num_components = ceil(file_size / default_component_size)
2535 num_components = _DivideAndCeil(file_size, default_component_size) 3016 num_components = DivideAndCeil(file_size, default_component_size)
2536 3017
2537 # num_components must be in the range [2, max_components] 3018 # num_components must be in the range [2, max_components]
2538 num_components = max(min(num_components, max_components), 2) 3019 num_components = max(min(num_components, max_components), 2)
2539 3020
2540 # component_size = ceil(file_size / num_components) 3021 # component_size = ceil(file_size / num_components)
2541 component_size = _DivideAndCeil(file_size, num_components) 3022 component_size = DivideAndCeil(file_size, num_components)
2542 return (num_components, component_size) 3023 return (num_components, component_size)
2543 3024
2544 3025
2545 def _DeleteObjectFn(cls, url_to_delete, thread_state=None): 3026 def _DeleteTempComponentObjectFn(cls, url_to_delete, thread_state=None):
2546 """Wrapper function to be used with command.Apply().""" 3027 """Wrapper func to be used with command.Apply to delete temporary objects."""
2547 gsutil_api = GetCloudApiInstance(cls, thread_state) 3028 gsutil_api = GetCloudApiInstance(cls, thread_state)
2548 gsutil_api.DeleteObject( 3029 try:
2549 url_to_delete.bucket_name, url_to_delete.object_name, 3030 gsutil_api.DeleteObject(
2550 generation=url_to_delete.generation, provider=url_to_delete.scheme) 3031 url_to_delete.bucket_name, url_to_delete.object_name,
3032 generation=url_to_delete.generation, provider=url_to_delete.scheme)
3033 except NotFoundException:
3034 # The temporary object could already be gone if a retry was
3035 # issued at a lower layer but the original request succeeded.
3036 # Barring other errors, the top-level command should still report success,
3037 # so don't raise here.
3038 pass
2551 3039
2552 3040
2553 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock): 3041 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock):
2554 """Parse the tracker file from the last parallel composite upload attempt. 3042 """Parse the tracker file from the last parallel composite upload attempt.
2555 3043
2556 If it exists, the tracker file is of the format described in 3044 If it exists, the tracker file is of the format described in
2557 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be 3045 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be
2558 read, then the upload will start from the beginning. 3046 read, then the upload will start from the beginning.
2559 3047
2560 Args: 3048 Args:
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after
2778 url.generation = tracker_object.generation 3266 url.generation = tracker_object.generation
2779 uploaded_components.append(url) 3267 uploaded_components.append(url)
2780 objects_already_chosen.append(tracker_object.object_name) 3268 objects_already_chosen.append(tracker_object.object_name)
2781 3269
2782 if uploaded_components: 3270 if uploaded_components:
2783 logging.info('Found %d existing temporary components to reuse.', 3271 logging.info('Found %d existing temporary components to reuse.',
2784 len(uploaded_components)) 3272 len(uploaded_components))
2785 3273
2786 return (components_to_upload, uploaded_components, 3274 return (components_to_upload, uploaded_components,
2787 existing_objects_to_delete) 3275 existing_objects_to_delete)
OLDNEW
« no previous file with comments | « third_party/gsutil/gslib/commands/web.py ('k') | third_party/gsutil/gslib/gcs_json_api.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698