OLD | NEW |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 # Copyright 2011 Google Inc. All Rights Reserved. | 2 # Copyright 2011 Google Inc. All Rights Reserved. |
3 # Copyright 2011, Nexenta Systems Inc. | 3 # Copyright 2011, Nexenta Systems Inc. |
4 # | 4 # |
5 # Licensed under the Apache License, Version 2.0 (the "License"); | 5 # Licensed under the Apache License, Version 2.0 (the "License"); |
6 # you may not use this file except in compliance with the License. | 6 # you may not use this file except in compliance with the License. |
7 # You may obtain a copy of the License at | 7 # You may obtain a copy of the License at |
8 # | 8 # |
9 # http://www.apache.org/licenses/LICENSE-2.0 | 9 # http://www.apache.org/licenses/LICENSE-2.0 |
10 # | 10 # |
11 # Unless required by applicable law or agreed to in writing, software | 11 # Unless required by applicable law or agreed to in writing, software |
12 # distributed under the License is distributed on an "AS IS" BASIS, | 12 # distributed under the License is distributed on an "AS IS" BASIS, |
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 # See the License for the specific language governing permissions and | 14 # See the License for the specific language governing permissions and |
15 # limitations under the License. | 15 # limitations under the License. |
16 """Helper functions for copy functionality.""" | 16 """Helper functions for copy functionality.""" |
17 | 17 |
18 from __future__ import absolute_import | 18 from __future__ import absolute_import |
19 | 19 |
20 import base64 | 20 import base64 |
21 from collections import namedtuple | 21 from collections import namedtuple |
22 import csv | 22 import csv |
23 import datetime | 23 import datetime |
24 import errno | 24 import errno |
25 import gzip | 25 import gzip |
26 from hashlib import md5 | 26 from hashlib import md5 |
27 import json | 27 import json |
28 import logging | 28 import logging |
29 import mimetypes | 29 import mimetypes |
30 import multiprocessing | 30 from operator import attrgetter |
31 import os | 31 import os |
32 import pickle | 32 import pickle |
33 import random | 33 import random |
34 import re | 34 import re |
35 import shutil | 35 import shutil |
36 import stat | 36 import stat |
37 import subprocess | 37 import subprocess |
38 import tempfile | 38 import tempfile |
39 import textwrap | 39 import textwrap |
40 import time | 40 import time |
41 import traceback | 41 import traceback |
42 | 42 |
43 from boto import config | 43 from boto import config |
44 import crcmod | 44 import crcmod |
45 | 45 |
46 import gslib | 46 import gslib |
47 from gslib.cloud_api import ArgumentException | 47 from gslib.cloud_api import ArgumentException |
48 from gslib.cloud_api import CloudApi | 48 from gslib.cloud_api import CloudApi |
49 from gslib.cloud_api import NotFoundException | 49 from gslib.cloud_api import NotFoundException |
50 from gslib.cloud_api import PreconditionException | 50 from gslib.cloud_api import PreconditionException |
51 from gslib.cloud_api import Preconditions | 51 from gslib.cloud_api import Preconditions |
52 from gslib.cloud_api import ResumableDownloadException | 52 from gslib.cloud_api import ResumableDownloadException |
53 from gslib.cloud_api import ResumableUploadAbortException | 53 from gslib.cloud_api import ResumableUploadAbortException |
54 from gslib.cloud_api import ResumableUploadException | 54 from gslib.cloud_api import ResumableUploadException |
55 from gslib.cloud_api import ResumableUploadStartOverException | 55 from gslib.cloud_api import ResumableUploadStartOverException |
56 from gslib.cloud_api_helper import GetDownloadSerializationDict | 56 from gslib.cloud_api_helper import GetDownloadSerializationData |
57 from gslib.commands.compose import MAX_COMPOSE_ARITY | 57 from gslib.commands.compose import MAX_COMPOSE_ARITY |
58 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SI
ZE | 58 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SI
ZE |
59 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD | 59 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD |
| 60 from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE |
| 61 from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS |
| 62 from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD |
60 from gslib.cs_api_map import ApiSelector | 63 from gslib.cs_api_map import ApiSelector |
61 from gslib.daisy_chain_wrapper import DaisyChainWrapper | 64 from gslib.daisy_chain_wrapper import DaisyChainWrapper |
62 from gslib.exception import CommandException | 65 from gslib.exception import CommandException |
63 from gslib.exception import HashMismatchException | 66 from gslib.exception import HashMismatchException |
64 from gslib.file_part import FilePart | 67 from gslib.file_part import FilePart |
65 from gslib.hashing_helper import Base64EncodeHash | 68 from gslib.hashing_helper import Base64EncodeHash |
66 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents | 69 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents |
67 from gslib.hashing_helper import CalculateHashesFromContents | 70 from gslib.hashing_helper import CalculateHashesFromContents |
| 71 from gslib.hashing_helper import CHECK_HASH_IF_FAST_ELSE_FAIL |
| 72 from gslib.hashing_helper import CHECK_HASH_NEVER |
| 73 from gslib.hashing_helper import ConcatCrc32c |
68 from gslib.hashing_helper import GetDownloadHashAlgs | 74 from gslib.hashing_helper import GetDownloadHashAlgs |
69 from gslib.hashing_helper import GetUploadHashAlgs | 75 from gslib.hashing_helper import GetUploadHashAlgs |
70 from gslib.hashing_helper import HashingFileUploadWrapper | 76 from gslib.hashing_helper import HashingFileUploadWrapper |
71 from gslib.parallelism_framework_util import ThreadAndProcessSafeDict | 77 from gslib.parallelism_framework_util import AtomicDict |
72 from gslib.parallelism_framework_util import ThreadSafeDict | |
73 from gslib.progress_callback import ConstructAnnounceText | 78 from gslib.progress_callback import ConstructAnnounceText |
74 from gslib.progress_callback import FileProgressCallbackHandler | 79 from gslib.progress_callback import FileProgressCallbackHandler |
75 from gslib.progress_callback import ProgressCallbackWithBackoff | 80 from gslib.progress_callback import ProgressCallbackWithBackoff |
76 from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper | 81 from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper |
77 from gslib.storage_url import ContainsWildcard | 82 from gslib.storage_url import ContainsWildcard |
78 from gslib.storage_url import StorageUrlFromString | 83 from gslib.storage_url import StorageUrlFromString |
79 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages | 84 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages |
| 85 from gslib.tracker_file import DeleteDownloadTrackerFiles |
80 from gslib.tracker_file import DeleteTrackerFile | 86 from gslib.tracker_file import DeleteTrackerFile |
81 from gslib.tracker_file import GetTrackerFilePath | 87 from gslib.tracker_file import GetTrackerFilePath |
82 from gslib.tracker_file import RaiseUnwritableTrackerFileException | 88 from gslib.tracker_file import RaiseUnwritableTrackerFileException |
83 from gslib.tracker_file import ReadOrCreateDownloadTrackerFile | 89 from gslib.tracker_file import ReadOrCreateDownloadTrackerFile |
84 from gslib.tracker_file import TrackerFileType | 90 from gslib.tracker_file import TrackerFileType |
| 91 from gslib.tracker_file import WriteDownloadComponentTrackerFile |
85 from gslib.translation_helper import AddS3MarkerAclToObjectMetadata | 92 from gslib.translation_helper import AddS3MarkerAclToObjectMetadata |
86 from gslib.translation_helper import CopyObjectMetadata | 93 from gslib.translation_helper import CopyObjectMetadata |
87 from gslib.translation_helper import DEFAULT_CONTENT_TYPE | 94 from gslib.translation_helper import DEFAULT_CONTENT_TYPE |
88 from gslib.translation_helper import GenerationFromUrlAndString | 95 from gslib.translation_helper import GenerationFromUrlAndString |
89 from gslib.translation_helper import ObjectMetadataFromHeaders | 96 from gslib.translation_helper import ObjectMetadataFromHeaders |
90 from gslib.translation_helper import PreconditionsFromHeaders | 97 from gslib.translation_helper import PreconditionsFromHeaders |
91 from gslib.translation_helper import S3MarkerAclFromObjectMetadata | 98 from gslib.translation_helper import S3MarkerAclFromObjectMetadata |
| 99 from gslib.util import CheckFreeSpace |
| 100 from gslib.util import CheckMultiprocessingAvailableAndInit |
92 from gslib.util import CreateLock | 101 from gslib.util import CreateLock |
93 from gslib.util import DEFAULT_FILE_BUFFER_SIZE | 102 from gslib.util import DEFAULT_FILE_BUFFER_SIZE |
| 103 from gslib.util import DivideAndCeil |
94 from gslib.util import GetCloudApiInstance | 104 from gslib.util import GetCloudApiInstance |
95 from gslib.util import GetFileSize | 105 from gslib.util import GetFileSize |
96 from gslib.util import GetJsonResumableChunkSize | 106 from gslib.util import GetJsonResumableChunkSize |
97 from gslib.util import GetMaxRetryDelay | 107 from gslib.util import GetMaxRetryDelay |
98 from gslib.util import GetNumRetries | 108 from gslib.util import GetNumRetries |
99 from gslib.util import GetStreamFromFileUrl | 109 from gslib.util import GetStreamFromFileUrl |
100 from gslib.util import HumanReadableToBytes | 110 from gslib.util import HumanReadableToBytes |
101 from gslib.util import IS_WINDOWS | 111 from gslib.util import IS_WINDOWS |
102 from gslib.util import IsCloudSubdirPlaceholder | 112 from gslib.util import IsCloudSubdirPlaceholder |
103 from gslib.util import MakeHumanReadable | 113 from gslib.util import MakeHumanReadable |
104 from gslib.util import MIN_SIZE_COMPUTE_LOGGING | 114 from gslib.util import MIN_SIZE_COMPUTE_LOGGING |
105 from gslib.util import MultiprocessingIsAvailable | |
106 from gslib.util import ResumableThreshold | 115 from gslib.util import ResumableThreshold |
107 from gslib.util import TEN_MIB | 116 from gslib.util import TEN_MIB |
| 117 from gslib.util import UsingCrcmodExtension |
108 from gslib.util import UTF8 | 118 from gslib.util import UTF8 |
109 from gslib.wildcard_iterator import CreateWildcardIterator | 119 from gslib.wildcard_iterator import CreateWildcardIterator |
110 | 120 |
111 # pylint: disable=g-import-not-at-top | 121 # pylint: disable=g-import-not-at-top |
112 if IS_WINDOWS: | 122 if IS_WINDOWS: |
113 import msvcrt | 123 import msvcrt |
114 from ctypes import c_int | |
115 from ctypes import c_uint64 | |
116 from ctypes import c_char_p | |
117 from ctypes import c_wchar_p | |
118 from ctypes import windll | |
119 from ctypes import POINTER | |
120 from ctypes import WINFUNCTYPE | |
121 from ctypes import WinError | |
122 | 124 |
123 # Declare copy_helper_opts as a global because namedtuple isn't aware of | 125 # Declare copy_helper_opts as a global because namedtuple isn't aware of |
124 # assigning to a class member (which breaks pickling done by multiprocessing). | 126 # assigning to a class member (which breaks pickling done by multiprocessing). |
125 # For details see | 127 # For details see |
126 # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instanc
e-correctly | 128 # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instanc
e-correctly |
127 # Similarly can't pickle logger. | |
128 # pylint: disable=global-at-module-level | 129 # pylint: disable=global-at-module-level |
129 global global_copy_helper_opts, global_logger | 130 global global_copy_helper_opts |
130 | 131 |
131 # In-memory map of local files that are currently opened for write. Used to | 132 # In-memory map of local files that are currently opened for write. Used to |
132 # ensure that if we write to the same file twice (say, for example, because the | 133 # ensure that if we write to the same file twice (say, for example, because the |
133 # user specified two identical source URLs), the writes occur serially. | 134 # user specified two identical source URLs), the writes occur serially. |
134 global open_files_map | 135 global open_files_map, open_files_lock |
135 open_files_map = ( | 136 open_files_map = ( |
136 ThreadSafeDict() if (IS_WINDOWS or not MultiprocessingIsAvailable()[0]) | 137 AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available |
137 else ThreadAndProcessSafeDict(multiprocessing.Manager())) | 138 else AtomicDict(manager=gslib.util.manager)) |
| 139 |
| 140 # We don't allow multiple processes on Windows, so using a process-safe lock |
| 141 # would be unnecessary. |
| 142 open_files_lock = CreateLock() |
138 | 143 |
139 # For debugging purposes; if True, files and objects that fail hash validation | 144 # For debugging purposes; if True, files and objects that fail hash validation |
140 # will be saved with the below suffix appended. | 145 # will be saved with the below suffix appended. |
141 _RENAME_ON_HASH_MISMATCH = False | 146 _RENAME_ON_HASH_MISMATCH = False |
142 _RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt' | 147 _RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt' |
143 | 148 |
144 PARALLEL_UPLOAD_TEMP_NAMESPACE = ( | 149 PARALLEL_UPLOAD_TEMP_NAMESPACE = ( |
145 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/') | 150 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/') |
146 | 151 |
147 PARALLEL_UPLOAD_STATIC_SALT = u""" | 152 PARALLEL_UPLOAD_STATIC_SALT = u""" |
(...skipping 23 matching lines...) Expand all Loading... |
171 # canned_acl: canned_acl to apply to the uploaded file/component. | 176 # canned_acl: canned_acl to apply to the uploaded file/component. |
172 # content_type: content-type for final object, used for setting content-type | 177 # content_type: content-type for final object, used for setting content-type |
173 # of components and final object. | 178 # of components and final object. |
174 # tracker_file: tracker file for this component. | 179 # tracker_file: tracker file for this component. |
175 # tracker_file_lock: tracker file lock for tracker file(s). | 180 # tracker_file_lock: tracker file lock for tracker file(s). |
176 PerformParallelUploadFileToObjectArgs = namedtuple( | 181 PerformParallelUploadFileToObjectArgs = namedtuple( |
177 'PerformParallelUploadFileToObjectArgs', | 182 'PerformParallelUploadFileToObjectArgs', |
178 'filename file_start file_length src_url dst_url canned_acl ' | 183 'filename file_start file_length src_url dst_url canned_acl ' |
179 'content_type tracker_file tracker_file_lock') | 184 'content_type tracker_file tracker_file_lock') |
180 | 185 |
| 186 PerformSlicedDownloadObjectToFileArgs = namedtuple( |
| 187 'PerformSlicedDownloadObjectToFileArgs', |
| 188 'component_num src_url src_obj_metadata dst_url download_file_name ' |
| 189 'start_byte end_byte') |
| 190 |
| 191 PerformSlicedDownloadReturnValues = namedtuple( |
| 192 'PerformSlicedDownloadReturnValues', |
| 193 'component_num crc32c bytes_transferred server_encoding') |
| 194 |
181 ObjectFromTracker = namedtuple('ObjectFromTracker', | 195 ObjectFromTracker = namedtuple('ObjectFromTracker', |
182 'object_name generation') | 196 'object_name generation') |
183 | 197 |
184 # TODO: Refactor this file to be less cumbersome. In particular, some of the | 198 # TODO: Refactor this file to be less cumbersome. In particular, some of the |
185 # different paths (e.g., uploading a file to an object vs. downloading an | 199 # different paths (e.g., uploading a file to an object vs. downloading an |
186 # object to a file) could be split into separate files. | 200 # object to a file) could be split into separate files. |
187 | 201 |
188 # Chunk size to use while zipping/unzipping gzip files. | 202 # Chunk size to use while zipping/unzipping gzip files. |
189 GZIP_CHUNK_SIZE = 8192 | 203 GZIP_CHUNK_SIZE = 8192 |
190 | 204 |
| 205 # Number of bytes to wait before updating a sliced download component tracker |
| 206 # file. |
| 207 TRACKERFILE_UPDATE_THRESHOLD = TEN_MIB |
| 208 |
191 PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024 | 209 PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024 |
192 | 210 |
193 # S3 requires special Multipart upload logic (that we currently don't implement) | 211 # S3 requires special Multipart upload logic (that we currently don't implement) |
194 # for files > 5GiB in size. | 212 # for files > 5GiB in size. |
195 S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024 | 213 S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024 |
196 | 214 |
197 suggested_parallel_composites = False | 215 # TODO: Create a multiprocessing framework value allocator, then use it instead |
| 216 # of a dict. |
| 217 global suggested_sliced_transfers, suggested_sliced_transfers_lock |
| 218 suggested_sliced_transfers = ( |
| 219 AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available |
| 220 else AtomicDict(manager=gslib.util.manager)) |
| 221 suggested_sliced_transfers_lock = CreateLock() |
198 | 222 |
199 | 223 |
200 class FileConcurrencySkipError(Exception): | 224 class FileConcurrencySkipError(Exception): |
201 """Raised when skipping a file due to a concurrent, duplicate copy.""" | 225 """Raised when skipping a file due to a concurrent, duplicate copy.""" |
202 | 226 |
203 | 227 |
204 def _RmExceptionHandler(cls, e): | 228 def _RmExceptionHandler(cls, e): |
205 """Simple exception handler to allow post-completion status.""" | 229 """Simple exception handler to allow post-completion status.""" |
206 cls.logger.error(str(e)) | 230 cls.logger.error(str(e)) |
207 | 231 |
208 | 232 |
209 def _ParallelUploadCopyExceptionHandler(cls, e): | 233 def _ParallelCopyExceptionHandler(cls, e): |
210 """Simple exception handler to allow post-completion status.""" | 234 """Simple exception handler to allow post-completion status.""" |
211 cls.logger.error(str(e)) | 235 cls.logger.error(str(e)) |
212 cls.op_failure_count += 1 | 236 cls.op_failure_count += 1 |
213 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n', | 237 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n', |
214 traceback.format_exc()) | 238 traceback.format_exc()) |
215 | 239 |
216 | 240 |
217 def _PerformParallelUploadFileToObject(cls, args, thread_state=None): | 241 def _PerformParallelUploadFileToObject(cls, args, thread_state=None): |
218 """Function argument to Apply for performing parallel composite uploads. | 242 """Function argument to Apply for performing parallel composite uploads. |
219 | 243 |
(...skipping 21 matching lines...) Expand all Loading... |
241 | 265 |
242 try: | 266 try: |
243 if global_copy_helper_opts.canned_acl: | 267 if global_copy_helper_opts.canned_acl: |
244 # No canned ACL support in JSON, force XML API to be used for | 268 # No canned ACL support in JSON, force XML API to be used for |
245 # upload/copy operations. | 269 # upload/copy operations. |
246 orig_prefer_api = gsutil_api.prefer_api | 270 orig_prefer_api = gsutil_api.prefer_api |
247 gsutil_api.prefer_api = ApiSelector.XML | 271 gsutil_api.prefer_api = ApiSelector.XML |
248 ret = _UploadFileToObject(args.src_url, fp, args.file_length, | 272 ret = _UploadFileToObject(args.src_url, fp, args.file_length, |
249 args.dst_url, dst_object_metadata, | 273 args.dst_url, dst_object_metadata, |
250 preconditions, gsutil_api, cls.logger, cls, | 274 preconditions, gsutil_api, cls.logger, cls, |
251 _ParallelUploadCopyExceptionHandler, | 275 _ParallelCopyExceptionHandler, |
252 gzip_exts=None, allow_splitting=False) | 276 gzip_exts=None, allow_splitting=False) |
253 finally: | 277 finally: |
254 if global_copy_helper_opts.canned_acl: | 278 if global_copy_helper_opts.canned_acl: |
255 gsutil_api.prefer_api = orig_prefer_api | 279 gsutil_api.prefer_api = orig_prefer_api |
256 | 280 |
257 component = ret[2] | 281 component = ret[2] |
258 _AppendComponentTrackerToParallelUploadTrackerFile( | 282 _AppendComponentTrackerToParallelUploadTrackerFile( |
259 args.tracker_file, component, args.tracker_file_lock) | 283 args.tracker_file, component, args.tracker_file_lock) |
260 return ret | 284 return ret |
261 | 285 |
(...skipping 372 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
634 | 658 |
635 def _CreateDigestsFromDigesters(digesters): | 659 def _CreateDigestsFromDigesters(digesters): |
636 digests = {} | 660 digests = {} |
637 if digesters: | 661 if digesters: |
638 for alg in digesters: | 662 for alg in digesters: |
639 digests[alg] = base64.encodestring( | 663 digests[alg] = base64.encodestring( |
640 digesters[alg].digest()).rstrip('\n') | 664 digesters[alg].digest()).rstrip('\n') |
641 return digests | 665 return digests |
642 | 666 |
643 | 667 |
644 def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata): | 668 def _CreateDigestsFromLocalFile(logger, algs, file_name, final_file_name, |
| 669 src_obj_metadata): |
645 """Creates a base64 CRC32C and/or MD5 digest from file_name. | 670 """Creates a base64 CRC32C and/or MD5 digest from file_name. |
646 | 671 |
647 Args: | 672 Args: |
648 logger: for outputting log messages. | 673 logger: For outputting log messages. |
649 algs: list of algorithms to compute. | 674 algs: List of algorithms to compute. |
650 file_name: file to digest. | 675 file_name: File to digest. |
651 src_obj_metadata: metadta of source object. | 676 final_file_name: Permanent location to be used for the downloaded file |
| 677 after validation (used for logging). |
| 678 src_obj_metadata: Metadata of source object. |
652 | 679 |
653 Returns: | 680 Returns: |
654 Dict of algorithm name : base 64 encoded digest | 681 Dict of algorithm name : base 64 encoded digest |
655 """ | 682 """ |
656 hash_dict = {} | 683 hash_dict = {} |
657 if 'md5' in algs: | 684 if 'md5' in algs: |
658 if src_obj_metadata.size and src_obj_metadata.size > TEN_MIB: | |
659 logger.info( | |
660 'Computing MD5 for %s...', file_name) | |
661 hash_dict['md5'] = md5() | 685 hash_dict['md5'] = md5() |
662 if 'crc32c' in algs: | 686 if 'crc32c' in algs: |
663 hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c') | 687 hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c') |
664 with open(file_name, 'rb') as fp: | 688 with open(file_name, 'rb') as fp: |
665 CalculateHashesFromContents( | 689 CalculateHashesFromContents( |
666 fp, hash_dict, ProgressCallbackWithBackoff( | 690 fp, hash_dict, ProgressCallbackWithBackoff( |
667 src_obj_metadata.size, | 691 src_obj_metadata.size, |
668 FileProgressCallbackHandler( | 692 FileProgressCallbackHandler( |
669 ConstructAnnounceText('Hashing', file_name), logger).call)) | 693 ConstructAnnounceText('Hashing', final_file_name), |
| 694 logger).call)) |
670 digests = {} | 695 digests = {} |
671 for alg_name, digest in hash_dict.iteritems(): | 696 for alg_name, digest in hash_dict.iteritems(): |
672 digests[alg_name] = Base64EncodeHash(digest.hexdigest()) | 697 digests[alg_name] = Base64EncodeHash(digest.hexdigest()) |
673 return digests | 698 return digests |
674 | 699 |
675 | 700 |
676 def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, | 701 def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, |
677 dst_obj_metadata): | 702 dst_obj_metadata): |
678 """Validates integrity of two cloud objects copied via daisy-chain. | 703 """Validates integrity of two cloud objects copied via daisy-chain. |
679 | 704 |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
723 | 748 |
724 | 749 |
725 def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests, | 750 def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests, |
726 is_upload=False): | 751 is_upload=False): |
727 """Validates integrity by comparing cloud digest to local digest. | 752 """Validates integrity by comparing cloud digest to local digest. |
728 | 753 |
729 Args: | 754 Args: |
730 logger: for outputting log messages. | 755 logger: for outputting log messages. |
731 obj_url: CloudUrl for cloud object. | 756 obj_url: CloudUrl for cloud object. |
732 obj_metadata: Cloud Object being downloaded from or uploaded to. | 757 obj_metadata: Cloud Object being downloaded from or uploaded to. |
733 file_name: Local file name on disk being downloaded to or uploaded from. | 758 file_name: Local file name on disk being downloaded to or uploaded from |
| 759 (used only for logging). |
734 digests: Computed Digests for the object. | 760 digests: Computed Digests for the object. |
735 is_upload: If true, comparing for an uploaded object (controls logging). | 761 is_upload: If true, comparing for an uploaded object (controls logging). |
736 | 762 |
737 Raises: | 763 Raises: |
738 CommandException: if cloud digests don't match local digests. | 764 CommandException: if cloud digests don't match local digests. |
739 """ | 765 """ |
740 local_hashes = digests | 766 local_hashes = digests |
741 cloud_hashes = {} | 767 cloud_hashes = {} |
742 if obj_metadata.md5Hash: | 768 if obj_metadata.md5Hash: |
743 cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n') | 769 cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n') |
(...skipping 234 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
978 request_components, dst_obj_metadata, preconditions=preconditions, | 1004 request_components, dst_obj_metadata, preconditions=preconditions, |
979 provider=dst_url.scheme, fields=['generation', 'crc32c', 'size']) | 1005 provider=dst_url.scheme, fields=['generation', 'crc32c', 'size']) |
980 | 1006 |
981 try: | 1007 try: |
982 # Make sure only to delete things that we know were successfully | 1008 # Make sure only to delete things that we know were successfully |
983 # uploaded (as opposed to all of the objects that we attempted to | 1009 # uploaded (as opposed to all of the objects that we attempted to |
984 # create) so that we don't delete any preexisting objects, except for | 1010 # create) so that we don't delete any preexisting objects, except for |
985 # those that were uploaded by a previous, failed run and have since | 1011 # those that were uploaded by a previous, failed run and have since |
986 # changed (but still have an old generation lying around). | 1012 # changed (but still have an old generation lying around). |
987 objects_to_delete = components + existing_objects_to_delete | 1013 objects_to_delete = components + existing_objects_to_delete |
988 command_obj.Apply(_DeleteObjectFn, objects_to_delete, _RmExceptionHandler, | 1014 command_obj.Apply( |
989 arg_checker=gslib.command.DummyArgChecker, | 1015 _DeleteTempComponentObjectFn, objects_to_delete, _RmExceptionHandler, |
990 parallel_operations_override=True) | 1016 arg_checker=gslib.command.DummyArgChecker, |
| 1017 parallel_operations_override=True) |
991 except Exception: # pylint: disable=broad-except | 1018 except Exception: # pylint: disable=broad-except |
992 # If some of the delete calls fail, don't cause the whole command to | 1019 # If some of the delete calls fail, don't cause the whole command to |
993 # fail. The copy was successful iff the compose call succeeded, so | 1020 # fail. The copy was successful iff the compose call succeeded, so |
994 # reduce this to a warning. | 1021 # reduce this to a warning. |
995 logging.warning( | 1022 logging.warning( |
996 'Failed to delete some of the following temporary objects:\n' + | 1023 'Failed to delete some of the following temporary objects:\n' + |
997 '\n'.join(dst_args.keys())) | 1024 '\n'.join(dst_args.keys())) |
998 finally: | 1025 finally: |
999 with tracker_file_lock: | 1026 with tracker_file_lock: |
1000 if os.path.exists(tracker_file): | 1027 if os.path.exists(tracker_file): |
(...skipping 17 matching lines...) Expand all Loading... |
1018 logger: for outputting log messages. | 1045 logger: for outputting log messages. |
1019 allow_splitting: If false, then this function returns false. | 1046 allow_splitting: If false, then this function returns false. |
1020 src_url: FileUrl corresponding to a local file. | 1047 src_url: FileUrl corresponding to a local file. |
1021 dst_url: CloudUrl corresponding to destination cloud object. | 1048 dst_url: CloudUrl corresponding to destination cloud object. |
1022 file_size: The size of the source file, in bytes. | 1049 file_size: The size of the source file, in bytes. |
1023 canned_acl: Canned ACL to apply to destination object, if any. | 1050 canned_acl: Canned ACL to apply to destination object, if any. |
1024 | 1051 |
1025 Returns: | 1052 Returns: |
1026 True iff a parallel upload should be performed on the source file. | 1053 True iff a parallel upload should be performed on the source file. |
1027 """ | 1054 """ |
1028 global suggested_parallel_composites | 1055 global suggested_slice_transfers, suggested_sliced_transfers_lock |
1029 parallel_composite_upload_threshold = HumanReadableToBytes(config.get( | 1056 parallel_composite_upload_threshold = HumanReadableToBytes(config.get( |
1030 'GSUtil', 'parallel_composite_upload_threshold', | 1057 'GSUtil', 'parallel_composite_upload_threshold', |
1031 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD)) | 1058 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD)) |
1032 | 1059 |
1033 all_factors_but_size = ( | 1060 all_factors_but_size = ( |
1034 allow_splitting # Don't split the pieces multiple times. | 1061 allow_splitting # Don't split the pieces multiple times. |
1035 and not src_url.IsStream() # We can't partition streams. | 1062 and not src_url.IsStream() # We can't partition streams. |
1036 and dst_url.scheme == 'gs' # Compose is only for gs. | 1063 and dst_url.scheme == 'gs' # Compose is only for gs. |
1037 and not canned_acl) # TODO: Implement canned ACL support for compose. | 1064 and not canned_acl) # TODO: Implement canned ACL support for compose. |
1038 | 1065 |
1039 # Since parallel composite uploads are disabled by default, make user aware of | 1066 # Since parallel composite uploads are disabled by default, make user aware of |
1040 # them. | 1067 # them. |
1041 # TODO: Once compiled crcmod is being distributed by major Linux distributions | 1068 # TODO: Once compiled crcmod is being distributed by major Linux distributions |
1042 # remove this check. | 1069 # remove this check. |
1043 if (all_factors_but_size and parallel_composite_upload_threshold == 0 | 1070 if (all_factors_but_size and parallel_composite_upload_threshold == 0 |
1044 and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD | 1071 and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD): |
1045 and not suggested_parallel_composites): | 1072 with suggested_sliced_transfers_lock: |
1046 logger.info('\n'.join(textwrap.wrap( | 1073 if not suggested_sliced_transfers.get('suggested'): |
1047 '==> NOTE: You are uploading one or more large file(s), which would ' | 1074 logger.info('\n'.join(textwrap.wrap( |
1048 'run significantly faster if you enable parallel composite uploads. ' | 1075 '==> NOTE: You are uploading one or more large file(s), which ' |
1049 'This feature can be enabled by editing the ' | 1076 'would run significantly faster if you enable parallel composite ' |
1050 '"parallel_composite_upload_threshold" value in your .boto ' | 1077 'uploads. This feature can be enabled by editing the ' |
1051 'configuration file. However, note that if you do this you and any ' | 1078 '"parallel_composite_upload_threshold" value in your .boto ' |
1052 'users that download such composite files will need to have a compiled ' | 1079 'configuration file. However, note that if you do this you and any ' |
1053 'crcmod installed (see "gsutil help crcmod").')) + '\n') | 1080 'users that download such composite files will need to have a ' |
1054 suggested_parallel_composites = True | 1081 'compiled crcmod installed (see "gsutil help crcmod").')) + '\n') |
| 1082 suggested_sliced_transfers['suggested'] = True |
1055 | 1083 |
1056 return (all_factors_but_size | 1084 return (all_factors_but_size |
1057 and parallel_composite_upload_threshold > 0 | 1085 and parallel_composite_upload_threshold > 0 |
1058 and file_size >= parallel_composite_upload_threshold) | 1086 and file_size >= parallel_composite_upload_threshold) |
1059 | 1087 |
1060 | 1088 |
1061 def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id, | 1089 def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id, |
1062 treat_nonexistent_object_as_subdir=False): | 1090 treat_nonexistent_object_as_subdir=False): |
1063 """Expands wildcard if present in url_str. | 1091 """Expands wildcard if present in url_str. |
1064 | 1092 |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1097 storage_url = StorageUrlFromString(url_str) | 1125 storage_url = StorageUrlFromString(url_str) |
1098 | 1126 |
1099 # Handle non-wildcarded URL. | 1127 # Handle non-wildcarded URL. |
1100 if storage_url.IsFileUrl(): | 1128 if storage_url.IsFileUrl(): |
1101 return (storage_url, storage_url.IsDirectory()) | 1129 return (storage_url, storage_url.IsDirectory()) |
1102 | 1130 |
1103 # At this point we have a cloud URL. | 1131 # At this point we have a cloud URL. |
1104 if storage_url.IsBucket(): | 1132 if storage_url.IsBucket(): |
1105 return (storage_url, True) | 1133 return (storage_url, True) |
1106 | 1134 |
1107 # For object/prefix URLs check 3 cases: (a) if the name ends with '/' treat | 1135 # For object/prefix URLs, there are four cases that indicate the destination |
1108 # as a subdir; otherwise, use the wildcard iterator with url to | 1136 # is a cloud subdirectory; these are always considered to be an existing |
1109 # find if (b) there's a Prefix matching url, or (c) name is of form | 1137 # container. Checking each case allows gsutil to provide Unix-like |
1110 # dir_$folder$ (and in both these cases also treat dir as a subdir). | 1138 # destination folder semantics, but requires up to three HTTP calls, noted |
1111 # Cloud subdirs are always considered to be an existing container. | 1139 # below. |
| 1140 |
| 1141 # Case 1: If a placeholder object ending with '/' exists. |
1112 if IsCloudSubdirPlaceholder(storage_url): | 1142 if IsCloudSubdirPlaceholder(storage_url): |
1113 return (storage_url, True) | 1143 return (storage_url, True) |
1114 | 1144 |
1115 # Check for the special case where we have a folder marker object. | 1145 # HTTP call to make an eventually consistent check for a matching prefix, |
1116 folder_expansion = CreateWildcardIterator( | 1146 # _$folder$, or empty listing. |
1117 storage_url.versionless_url_string + '_$folder$', gsutil_api, | 1147 expansion_empty = True |
1118 debug=debug, project_id=project_id).IterAll( | 1148 list_iterator = gsutil_api.ListObjects( |
1119 bucket_listing_fields=['name']) | 1149 storage_url.bucket_name, prefix=storage_url.object_name, delimiter='/', |
1120 for blr in folder_expansion: | 1150 provider=storage_url.scheme, fields=['prefixes', 'items/name']) |
1121 return (storage_url, True) | 1151 for obj_or_prefix in list_iterator: |
| 1152 # To conserve HTTP calls for the common case, we make a single listing |
| 1153 # that covers prefixes and object names. Listing object names covers the |
| 1154 # _$folder$ case and the nonexistent-object-as-subdir case. However, if |
| 1155 # there are many existing objects for which the target URL is an exact |
| 1156 # prefix, this listing could be paginated and span multiple HTTP calls. |
| 1157 # If this case becomes common, we could heurestically abort the |
| 1158 # listing operation after the first page of results and just query for the |
| 1159 # _$folder$ object directly using GetObjectMetadata. |
| 1160 expansion_empty = False |
1122 | 1161 |
1123 blr_expansion = CreateWildcardIterator(url_str, gsutil_api, | 1162 if obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.PREFIX: |
1124 debug=debug, | 1163 # Case 2: If there is a matching prefix when listing the destination URL. |
1125 project_id=project_id).IterAll( | 1164 return (storage_url, True) |
1126 bucket_listing_fields=['name']) | 1165 elif (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT and |
1127 expansion_empty = True | 1166 obj_or_prefix.data.name == storage_url.object_name + '_$folder$'): |
1128 for blr in blr_expansion: | 1167 # Case 3: If a placeholder object matching destination + _$folder$ |
1129 expansion_empty = False | 1168 # exists. |
1130 if blr.IsPrefix(): | |
1131 return (storage_url, True) | 1169 return (storage_url, True) |
1132 | 1170 |
1133 return (storage_url, | 1171 # Case 4: If no objects/prefixes matched, and nonexistent objects should be |
1134 expansion_empty and treat_nonexistent_object_as_subdir) | 1172 # treated as subdirectories. |
| 1173 return (storage_url, expansion_empty and treat_nonexistent_object_as_subdir) |
1135 | 1174 |
1136 | 1175 |
1137 def FixWindowsNaming(src_url, dst_url): | 1176 def FixWindowsNaming(src_url, dst_url): |
1138 """Translates Windows pathnames to cloud pathnames. | 1177 """Translates Windows pathnames to cloud pathnames. |
1139 | 1178 |
1140 Rewrites the destination URL built by ConstructDstUrl(). | 1179 Rewrites the destination URL built by ConstructDstUrl(). |
1141 | 1180 |
1142 Args: | 1181 Args: |
1143 src_url: Source StorageUrl to be copied. | 1182 src_url: Source StorageUrl to be copied. |
1144 dst_url: The destination StorageUrl built by ConstructDstUrl(). | 1183 dst_url: The destination StorageUrl built by ConstructDstUrl(). |
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1235 end_time = time.time() | 1274 end_time = time.time() |
1236 | 1275 |
1237 result_url = dst_url.Clone() | 1276 result_url = dst_url.Clone() |
1238 result_url.generation = GenerationFromUrlAndString(result_url, | 1277 result_url.generation = GenerationFromUrlAndString(result_url, |
1239 dst_obj.generation) | 1278 dst_obj.generation) |
1240 | 1279 |
1241 return (end_time - start_time, src_obj_metadata.size, result_url, | 1280 return (end_time - start_time, src_obj_metadata.size, result_url, |
1242 dst_obj.md5Hash) | 1281 dst_obj.md5Hash) |
1243 | 1282 |
1244 | 1283 |
1245 def _CheckFreeSpace(path): | |
1246 """Return path/drive free space (in bytes).""" | |
1247 if IS_WINDOWS: | |
1248 # pylint: disable=g-import-not-at-top | |
1249 try: | |
1250 # pylint: disable=invalid-name | |
1251 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p, | |
1252 POINTER(c_uint64), | |
1253 POINTER(c_uint64), | |
1254 POINTER(c_uint64)) | |
1255 get_disk_free_space_ex = get_disk_free_space_ex( | |
1256 ('GetDiskFreeSpaceExW', windll.kernel32), ( | |
1257 (1, 'lpszPathName'), | |
1258 (2, 'lpFreeUserSpace'), | |
1259 (2, 'lpTotalSpace'), | |
1260 (2, 'lpFreeSpace'),)) | |
1261 except AttributeError: | |
1262 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p, | |
1263 POINTER(c_uint64), | |
1264 POINTER(c_uint64), | |
1265 POINTER(c_uint64)) | |
1266 get_disk_free_space_ex = get_disk_free_space_ex( | |
1267 ('GetDiskFreeSpaceExA', windll.kernel32), ( | |
1268 (1, 'lpszPathName'), | |
1269 (2, 'lpFreeUserSpace'), | |
1270 (2, 'lpTotalSpace'), | |
1271 (2, 'lpFreeSpace'),)) | |
1272 | |
1273 def GetDiskFreeSpaceExErrCheck(result, unused_func, args): | |
1274 if not result: | |
1275 raise WinError() | |
1276 return args[1].value | |
1277 get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck | |
1278 | |
1279 return get_disk_free_space_ex(os.getenv('SystemDrive')) | |
1280 else: | |
1281 (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path) | |
1282 return f_frsize * f_bavail | |
1283 | |
1284 | |
1285 def _SetContentTypeFromFile(src_url, dst_obj_metadata): | 1284 def _SetContentTypeFromFile(src_url, dst_obj_metadata): |
1286 """Detects and sets Content-Type if src_url names a local file. | 1285 """Detects and sets Content-Type if src_url names a local file. |
1287 | 1286 |
1288 Args: | 1287 Args: |
1289 src_url: Source StorageUrl. | 1288 src_url: Source StorageUrl. |
1290 dst_obj_metadata: Object-specific metadata that should be overidden during | 1289 dst_obj_metadata: Object-specific metadata that should be overidden during |
1291 the copy. | 1290 the copy. |
1292 """ | 1291 """ |
1293 # contentType == '' if user requested default type. | 1292 # contentType == '' if user requested default type. |
1294 if (dst_obj_metadata.contentType is None and src_url.IsFileUrl() | 1293 if (dst_obj_metadata.contentType is None and src_url.IsFileUrl() |
(...skipping 207 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1502 # TODO: Compress using a streaming model as opposed to all at once here. | 1501 # TODO: Compress using a streaming model as opposed to all at once here. |
1503 if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING: | 1502 if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING: |
1504 logger.info( | 1503 logger.info( |
1505 'Compressing %s (to tmp)...', src_url) | 1504 'Compressing %s (to tmp)...', src_url) |
1506 (gzip_fh, gzip_path) = tempfile.mkstemp() | 1505 (gzip_fh, gzip_path) = tempfile.mkstemp() |
1507 gzip_fp = None | 1506 gzip_fp = None |
1508 try: | 1507 try: |
1509 # Check for temp space. Assume the compressed object is at most 2x | 1508 # Check for temp space. Assume the compressed object is at most 2x |
1510 # the size of the object (normally should compress to smaller than | 1509 # the size of the object (normally should compress to smaller than |
1511 # the object) | 1510 # the object) |
1512 if _CheckFreeSpace(gzip_path) < 2*int(src_obj_size): | 1511 if CheckFreeSpace(gzip_path) < 2*int(src_obj_size): |
1513 raise CommandException('Inadequate temp space available to compress ' | 1512 raise CommandException('Inadequate temp space available to compress ' |
1514 '%s. See the CHANGING TEMP DIRECTORIES section ' | 1513 '%s. See the CHANGING TEMP DIRECTORIES section ' |
1515 'of "gsutil help cp" for more info.' % src_url) | 1514 'of "gsutil help cp" for more info.' % src_url) |
1516 gzip_fp = gzip.open(gzip_path, 'wb') | 1515 gzip_fp = gzip.open(gzip_path, 'wb') |
1517 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) | 1516 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) |
1518 while data: | 1517 while data: |
1519 gzip_fp.write(data) | 1518 gzip_fp.write(data) |
1520 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) | 1519 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) |
1521 finally: | 1520 finally: |
1522 if gzip_fp: | 1521 if gzip_fp: |
(...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1650 result_url = dst_url.Clone() | 1649 result_url = dst_url.Clone() |
1651 | 1650 |
1652 result_url.generation = uploaded_object.generation | 1651 result_url.generation = uploaded_object.generation |
1653 result_url.generation = GenerationFromUrlAndString( | 1652 result_url.generation = GenerationFromUrlAndString( |
1654 result_url, uploaded_object.generation) | 1653 result_url, uploaded_object.generation) |
1655 | 1654 |
1656 return (elapsed_time, uploaded_object.size, result_url, | 1655 return (elapsed_time, uploaded_object.size, result_url, |
1657 uploaded_object.md5Hash) | 1656 uploaded_object.md5Hash) |
1658 | 1657 |
1659 | 1658 |
1660 # TODO: Refactor this long function into smaller pieces. | 1659 def _GetDownloadFile(dst_url, src_obj_metadata, logger): |
1661 # pylint: disable=too-many-statements | 1660 """Creates a new download file, and deletes the file that will be replaced. |
1662 def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, | 1661 |
1663 gsutil_api, logger, test_method=None): | 1662 Names and creates a temporary file for this download. Also, if there is an |
1664 """Downloads an object to a local file. | 1663 existing file at the path where this file will be placed after the download |
| 1664 is completed, that file will be deleted. |
1665 | 1665 |
1666 Args: | 1666 Args: |
1667 src_url: Source CloudUrl. | 1667 dst_url: Destination FileUrl. |
1668 src_obj_metadata: Metadata from the source object. | 1668 src_obj_metadata: Metadata from the source object. |
1669 dst_url: Destination FileUrl. | |
1670 gsutil_api: gsutil Cloud API instance to use for the download. | |
1671 logger: for outputting log messages. | 1669 logger: for outputting log messages. |
1672 test_method: Optional test method for modifying the file before validation | 1670 |
1673 during unit tests. | |
1674 Returns: | 1671 Returns: |
1675 (elapsed_time, bytes_transferred, dst_url, md5), excluding overhead like | 1672 (download_file_name, need_to_unzip) |
1676 initial GET. | 1673 download_file_name: The name of the temporary file to which the object will |
1677 | 1674 be downloaded. |
1678 Raises: | 1675 need_to_unzip: If true, a temporary zip file was used and must be |
1679 CommandException: if errors encountered. | 1676 uncompressed as part of validation. |
1680 """ | 1677 """ |
1681 global open_files_map | 1678 dir_name = os.path.dirname(dst_url.object_name) |
1682 file_name = dst_url.object_name | |
1683 dir_name = os.path.dirname(file_name) | |
1684 if dir_name and not os.path.exists(dir_name): | 1679 if dir_name and not os.path.exists(dir_name): |
1685 # Do dir creation in try block so can ignore case where dir already | 1680 # Do dir creation in try block so can ignore case where dir already |
1686 # exists. This is needed to avoid a race condition when running gsutil | 1681 # exists. This is needed to avoid a race condition when running gsutil |
1687 # -m cp. | 1682 # -m cp. |
1688 try: | 1683 try: |
1689 os.makedirs(dir_name) | 1684 os.makedirs(dir_name) |
1690 except OSError, e: | 1685 except OSError, e: |
1691 if e.errno != errno.EEXIST: | 1686 if e.errno != errno.EEXIST: |
1692 raise | 1687 raise |
1693 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) | 1688 |
| 1689 need_to_unzip = False |
1694 # For gzipped objects download to a temp file and unzip. For the XML API, | 1690 # For gzipped objects download to a temp file and unzip. For the XML API, |
1695 # the represents the result of a HEAD request. For the JSON API, this is | 1691 # this represents the result of a HEAD request. For the JSON API, this is |
1696 # the stored encoding which the service may not respect. However, if the | 1692 # the stored encoding which the service may not respect. However, if the |
1697 # server sends decompressed bytes for a file that is stored compressed | 1693 # server sends decompressed bytes for a file that is stored compressed |
1698 # (double compressed case), there is no way we can validate the hash and | 1694 # (double compressed case), there is no way we can validate the hash and |
1699 # we will fail our hash check for the object. | 1695 # we will fail our hash check for the object. |
1700 if (src_obj_metadata.contentEncoding and | 1696 if (src_obj_metadata.contentEncoding and |
1701 src_obj_metadata.contentEncoding.lower().endswith('gzip')): | 1697 src_obj_metadata.contentEncoding.lower().endswith('gzip')): |
1702 # We can't use tempfile.mkstemp() here because we need a predictable | 1698 need_to_unzip = True |
1703 # filename for resumable downloads. | 1699 download_file_name = _GetDownloadTempZipFileName(dst_url) |
1704 download_file_name = _GetDownloadZipFileName(file_name) | |
1705 logger.info( | 1700 logger.info( |
1706 'Downloading to temp gzip filename %s', download_file_name) | 1701 'Downloading to temp gzip filename %s', download_file_name) |
1707 need_to_unzip = True | |
1708 else: | 1702 else: |
1709 download_file_name = file_name | 1703 download_file_name = _GetDownloadTempFileName(dst_url) |
1710 need_to_unzip = False | 1704 |
1711 | 1705 # If a file exists at the permanent destination (where the file will be moved |
1712 if download_file_name.endswith(dst_url.delim): | 1706 # after the download is completed), delete it here to reduce disk space |
1713 logger.warn('\n'.join(textwrap.wrap( | 1707 # requirements. |
1714 'Skipping attempt to download to filename ending with slash (%s). This ' | 1708 if os.path.exists(dst_url.object_name): |
1715 'typically happens when using gsutil to download from a subdirectory ' | 1709 os.unlink(dst_url.object_name) |
1716 'created by the Cloud Console (https://cloud.google.com/console)' | 1710 |
1717 % download_file_name))) | 1711 # Downloads open the temporary download file in r+b mode, which requires it |
1718 return (0, 0, dst_url, '') | 1712 # to already exist, so we create it here if it doesn't exist already. |
1719 | 1713 fp = open(download_file_name, 'ab') |
1720 # Set up hash digesters. | 1714 fp.close() |
| 1715 return download_file_name, need_to_unzip |
| 1716 |
| 1717 |
| 1718 def _ShouldDoSlicedDownload(download_strategy, src_obj_metadata, |
| 1719 allow_splitting, logger): |
| 1720 """Determines whether the sliced download strategy should be used. |
| 1721 |
| 1722 Args: |
| 1723 download_strategy: CloudApi download strategy. |
| 1724 src_obj_metadata: Metadata from the source object. |
| 1725 allow_splitting: If false, then this function returns false. |
| 1726 logger: logging.Logger for log message output. |
| 1727 |
| 1728 Returns: |
| 1729 True iff a sliced download should be performed on the source file. |
| 1730 """ |
| 1731 sliced_object_download_threshold = HumanReadableToBytes(config.get( |
| 1732 'GSUtil', 'sliced_object_download_threshold', |
| 1733 DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD)) |
| 1734 |
| 1735 max_components = config.getint( |
| 1736 'GSUtil', 'sliced_object_download_max_components', |
| 1737 DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS) |
| 1738 |
| 1739 # Don't use sliced download if it will prevent us from performing an |
| 1740 # integrity check. |
| 1741 check_hashes_config = config.get( |
| 1742 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) |
| 1743 parallel_hashing = src_obj_metadata.crc32c and UsingCrcmodExtension(crcmod) |
| 1744 hashing_okay = parallel_hashing or check_hashes_config == CHECK_HASH_NEVER |
| 1745 |
| 1746 use_slice = ( |
| 1747 allow_splitting |
| 1748 and download_strategy is not CloudApi.DownloadStrategy.ONE_SHOT |
| 1749 and max_components > 1 |
| 1750 and hashing_okay |
| 1751 and sliced_object_download_threshold > 0 |
| 1752 and src_obj_metadata.size >= sliced_object_download_threshold) |
| 1753 |
| 1754 if (not use_slice |
| 1755 and src_obj_metadata.size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD |
| 1756 and not UsingCrcmodExtension(crcmod) |
| 1757 and check_hashes_config != CHECK_HASH_NEVER): |
| 1758 with suggested_sliced_transfers_lock: |
| 1759 if not suggested_sliced_transfers.get('suggested'): |
| 1760 logger.info('\n'.join(textwrap.wrap( |
| 1761 '==> NOTE: You are downloading one or more large file(s), which ' |
| 1762 'would run significantly faster if you enabled sliced object ' |
| 1763 'uploads. This feature is enabled by default but requires that ' |
| 1764 'compiled crcmod be installed (see "gsutil help crcmod").')) + '\n') |
| 1765 suggested_sliced_transfers['suggested'] = True |
| 1766 |
| 1767 return use_slice |
| 1768 |
| 1769 |
| 1770 def _PerformSlicedDownloadObjectToFile(cls, args, thread_state=None): |
| 1771 """Function argument to Apply for performing sliced downloads. |
| 1772 |
| 1773 Args: |
| 1774 cls: Calling Command class. |
| 1775 args: PerformSlicedDownloadObjectToFileArgs tuple describing the target. |
| 1776 thread_state: gsutil Cloud API instance to use for the operation. |
| 1777 |
| 1778 Returns: |
| 1779 PerformSlicedDownloadReturnValues named-tuple filled with: |
| 1780 component_num: The component number for this download. |
| 1781 crc32c: CRC32C hash value (integer) of the downloaded bytes. |
| 1782 bytes_transferred: The number of bytes transferred, potentially less |
| 1783 than the component size if the download was resumed. |
| 1784 """ |
| 1785 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) |
1721 hash_algs = GetDownloadHashAlgs( | 1786 hash_algs = GetDownloadHashAlgs( |
1722 logger, src_has_md5=src_obj_metadata.md5Hash, | 1787 cls.logger, consider_crc32c=args.src_obj_metadata.crc32c) |
1723 src_has_crc32c=src_obj_metadata.crc32c) | |
1724 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) | 1788 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) |
1725 | 1789 |
1726 fp = None | 1790 (bytes_transferred, server_encoding) = ( |
1727 # Tracks whether the server used a gzip encoding. | 1791 _DownloadObjectToFileResumable(args.src_url, args.src_obj_metadata, |
| 1792 args.dst_url, args.download_file_name, |
| 1793 gsutil_api, cls.logger, digesters, |
| 1794 component_num=args.component_num, |
| 1795 start_byte=args.start_byte, |
| 1796 end_byte=args.end_byte)) |
| 1797 |
| 1798 crc32c_val = None |
| 1799 if 'crc32c' in digesters: |
| 1800 crc32c_val = digesters['crc32c'].crcValue |
| 1801 return PerformSlicedDownloadReturnValues( |
| 1802 args.component_num, crc32c_val, bytes_transferred, server_encoding) |
| 1803 |
| 1804 |
| 1805 def _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url, |
| 1806 download_file_name, logger, |
| 1807 api_selector, num_components): |
| 1808 """Maintains sliced download tracker files in order to permit resumability. |
| 1809 |
| 1810 Reads or creates a sliced download tracker file representing this object |
| 1811 download. Upon an attempt at cross-process resumption, the contents of the |
| 1812 sliced download tracker file are verified to make sure a resumption is |
| 1813 possible and appropriate. In the case that a resumption should not be |
| 1814 attempted, existing component tracker files are deleted (to prevent child |
| 1815 processes from attempting resumption), and a new sliced download tracker |
| 1816 file is created. |
| 1817 |
| 1818 Args: |
| 1819 src_obj_metadata: Metadata from the source object. Must include etag and |
| 1820 generation. |
| 1821 dst_url: Destination FileUrl. |
| 1822 download_file_name: Temporary file name to be used for the download. |
| 1823 logger: for outputting log messages. |
| 1824 api_selector: The Cloud API implementation used. |
| 1825 num_components: The number of components to perform this download with. |
| 1826 """ |
| 1827 assert src_obj_metadata.etag |
| 1828 tracker_file = None |
| 1829 |
| 1830 # Only can happen if the resumable threshold is set higher than the |
| 1831 # parallel transfer threshold. |
| 1832 if src_obj_metadata.size < ResumableThreshold(): |
| 1833 return |
| 1834 |
| 1835 tracker_file_name = GetTrackerFilePath(dst_url, |
| 1836 TrackerFileType.SLICED_DOWNLOAD, |
| 1837 api_selector) |
| 1838 |
| 1839 # Check to see if we should attempt resuming the download. |
| 1840 try: |
| 1841 fp = open(download_file_name, 'rb') |
| 1842 existing_file_size = GetFileSize(fp) |
| 1843 # A parallel resumption should be attempted only if the destination file |
| 1844 # size is exactly the same as the source size and the tracker file matches. |
| 1845 if existing_file_size == src_obj_metadata.size: |
| 1846 tracker_file = open(tracker_file_name, 'r') |
| 1847 tracker_file_data = json.load(tracker_file) |
| 1848 if (tracker_file_data['etag'] == src_obj_metadata.etag and |
| 1849 tracker_file_data['generation'] == src_obj_metadata.generation and |
| 1850 tracker_file_data['num_components'] == num_components): |
| 1851 return |
| 1852 else: |
| 1853 tracker_file.close() |
| 1854 logger.warn('Sliced download tracker file doesn\'t match for ' |
| 1855 'download of %s. Restarting download from scratch.' % |
| 1856 dst_url.object_name) |
| 1857 |
| 1858 except (IOError, ValueError) as e: |
| 1859 # Ignore non-existent file (happens first time a download |
| 1860 # is attempted on an object), but warn user for other errors. |
| 1861 if isinstance(e, ValueError) or e.errno != errno.ENOENT: |
| 1862 logger.warn('Couldn\'t read sliced download tracker file (%s): %s. ' |
| 1863 'Restarting download from scratch.' % |
| 1864 (tracker_file_name, str(e))) |
| 1865 finally: |
| 1866 if fp: |
| 1867 fp.close() |
| 1868 if tracker_file: |
| 1869 tracker_file.close() |
| 1870 |
| 1871 # Delete component tracker files to guarantee download starts from scratch. |
| 1872 DeleteDownloadTrackerFiles(dst_url, api_selector) |
| 1873 |
| 1874 # Create a new sliced download tracker file to represent this download. |
| 1875 try: |
| 1876 with open(tracker_file_name, 'w') as tracker_file: |
| 1877 tracker_file_data = {'etag': src_obj_metadata.etag, |
| 1878 'generation': src_obj_metadata.generation, |
| 1879 'num_components': num_components} |
| 1880 tracker_file.write(json.dumps(tracker_file_data)) |
| 1881 except IOError as e: |
| 1882 RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror) |
| 1883 |
| 1884 |
| 1885 class SlicedDownloadFileWrapper(object): |
| 1886 """Wraps a file object to be used in GetObjectMedia for sliced downloads. |
| 1887 |
| 1888 In order to allow resumability, the file object used by each thread in a |
| 1889 sliced object download should be wrapped using SlicedDownloadFileWrapper. |
| 1890 Passing a SlicedDownloadFileWrapper object to GetObjectMedia will allow the |
| 1891 download component tracker file for this component to be updated periodically, |
| 1892 while the downloaded bytes are normally written to file. |
| 1893 """ |
| 1894 |
| 1895 def __init__(self, fp, tracker_file_name, src_obj_metadata, start_byte, |
| 1896 end_byte): |
| 1897 """Initializes the SlicedDownloadFileWrapper. |
| 1898 |
| 1899 Args: |
| 1900 fp: The already-open file object to be used for writing in |
| 1901 GetObjectMedia. Data will be written to file starting at the current |
| 1902 seek position. |
| 1903 tracker_file_name: The name of the tracker file for this component. |
| 1904 src_obj_metadata: Metadata from the source object. Must include etag and |
| 1905 generation. |
| 1906 start_byte: The first byte to be downloaded for this parallel component. |
| 1907 end_byte: The last byte to be downloaded for this parallel component. |
| 1908 """ |
| 1909 self._orig_fp = fp |
| 1910 self._tracker_file_name = tracker_file_name |
| 1911 self._src_obj_metadata = src_obj_metadata |
| 1912 self._last_tracker_file_byte = None |
| 1913 self._start_byte = start_byte |
| 1914 self._end_byte = end_byte |
| 1915 |
| 1916 def write(self, data): # pylint: disable=invalid-name |
| 1917 current_file_pos = self._orig_fp.tell() |
| 1918 assert (self._start_byte <= current_file_pos and |
| 1919 current_file_pos + len(data) <= self._end_byte + 1) |
| 1920 |
| 1921 self._orig_fp.write(data) |
| 1922 current_file_pos = self._orig_fp.tell() |
| 1923 |
| 1924 threshold = TRACKERFILE_UPDATE_THRESHOLD |
| 1925 if (self._last_tracker_file_byte is None or |
| 1926 current_file_pos - self._last_tracker_file_byte > threshold or |
| 1927 current_file_pos == self._end_byte + 1): |
| 1928 WriteDownloadComponentTrackerFile( |
| 1929 self._tracker_file_name, self._src_obj_metadata, current_file_pos) |
| 1930 self._last_tracker_file_byte = current_file_pos |
| 1931 |
| 1932 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name |
| 1933 if whence == os.SEEK_END: |
| 1934 self._orig_fp.seek(offset + self._end_byte + 1) |
| 1935 else: |
| 1936 self._orig_fp.seek(offset, whence) |
| 1937 assert self._start_byte <= self._orig_fp.tell() <= self._end_byte + 1 |
| 1938 |
| 1939 def tell(self): # pylint: disable=invalid-name |
| 1940 return self._orig_fp.tell() |
| 1941 |
| 1942 def flush(self): # pylint: disable=invalid-name |
| 1943 self._orig_fp.flush() |
| 1944 |
| 1945 def close(self): # pylint: disable=invalid-name |
| 1946 if self._orig_fp: |
| 1947 self._orig_fp.close() |
| 1948 |
| 1949 |
| 1950 def _PartitionObject(src_url, src_obj_metadata, dst_url, |
| 1951 download_file_name): |
| 1952 """Partitions an object into components to be downloaded. |
| 1953 |
| 1954 Each component is a byte range of the object. The byte ranges |
| 1955 of the returned components are mutually exclusive and collectively |
| 1956 exhaustive. The byte ranges are inclusive at both end points. |
| 1957 |
| 1958 Args: |
| 1959 src_url: Source CloudUrl. |
| 1960 src_obj_metadata: Metadata from the source object. |
| 1961 dst_url: Destination FileUrl. |
| 1962 download_file_name: Temporary file name to be used for the download. |
| 1963 |
| 1964 Returns: |
| 1965 components_to_download: A list of PerformSlicedDownloadObjectToFileArgs |
| 1966 to be used in Apply for the sliced download. |
| 1967 """ |
| 1968 sliced_download_component_size = HumanReadableToBytes( |
| 1969 config.get('GSUtil', 'sliced_object_download_component_size', |
| 1970 DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE)) |
| 1971 |
| 1972 max_components = config.getint( |
| 1973 'GSUtil', 'sliced_object_download_max_components', |
| 1974 DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS) |
| 1975 |
| 1976 num_components, component_size = _GetPartitionInfo( |
| 1977 src_obj_metadata.size, max_components, sliced_download_component_size) |
| 1978 |
| 1979 components_to_download = [] |
| 1980 component_lengths = [] |
| 1981 for i in range(num_components): |
| 1982 start_byte = i * component_size |
| 1983 end_byte = min((i + 1) * (component_size) - 1, src_obj_metadata.size - 1) |
| 1984 component_lengths.append(end_byte - start_byte + 1) |
| 1985 components_to_download.append( |
| 1986 PerformSlicedDownloadObjectToFileArgs( |
| 1987 i, src_url, src_obj_metadata, dst_url, download_file_name, |
| 1988 start_byte, end_byte)) |
| 1989 return components_to_download, component_lengths |
| 1990 |
| 1991 |
| 1992 def _DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name, |
| 1993 command_obj, logger, copy_exception_handler, |
| 1994 api_selector): |
| 1995 """Downloads a cloud object to a local file using sliced download. |
| 1996 |
| 1997 Byte ranges are decided for each thread/process, and then the parts are |
| 1998 downloaded in parallel. |
| 1999 |
| 2000 Args: |
| 2001 src_url: Source CloudUrl. |
| 2002 src_obj_metadata: Metadata from the source object. |
| 2003 dst_url: Destination FileUrl. |
| 2004 download_file_name: Temporary file name to be used for download. |
| 2005 command_obj: command object for use in Apply in parallel composite uploads. |
| 2006 logger: for outputting log messages. |
| 2007 copy_exception_handler: For handling copy exceptions during Apply. |
| 2008 api_selector: The Cloud API implementation used. |
| 2009 |
| 2010 Returns: |
| 2011 (bytes_transferred, crc32c) |
| 2012 bytes_transferred: Number of bytes transferred from server this call. |
| 2013 crc32c: a crc32c hash value (integer) for the downloaded bytes, or None if |
| 2014 crc32c hashing wasn't performed. |
| 2015 """ |
| 2016 components_to_download, component_lengths = _PartitionObject( |
| 2017 src_url, src_obj_metadata, dst_url, download_file_name) |
| 2018 |
| 2019 num_components = len(components_to_download) |
| 2020 _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url, |
| 2021 download_file_name, logger, |
| 2022 api_selector, num_components) |
| 2023 |
| 2024 # Resize the download file so each child process can seek to its start byte. |
| 2025 with open(download_file_name, 'ab') as fp: |
| 2026 fp.truncate(src_obj_metadata.size) |
| 2027 |
| 2028 cp_results = command_obj.Apply( |
| 2029 _PerformSlicedDownloadObjectToFile, components_to_download, |
| 2030 copy_exception_handler, arg_checker=gslib.command.DummyArgChecker, |
| 2031 parallel_operations_override=True, should_return_results=True) |
| 2032 |
| 2033 if len(cp_results) < num_components: |
| 2034 raise CommandException( |
| 2035 'Some components of %s were not downloaded successfully. ' |
| 2036 'Please retry this download.' % dst_url.object_name) |
| 2037 |
| 2038 # Crc32c hashes have to be concatenated in the correct order. |
| 2039 cp_results = sorted(cp_results, key=attrgetter('component_num')) |
| 2040 crc32c = cp_results[0].crc32c |
| 2041 if crc32c is not None: |
| 2042 for i in range(1, num_components): |
| 2043 crc32c = ConcatCrc32c(crc32c, cp_results[i].crc32c, |
| 2044 component_lengths[i]) |
| 2045 |
| 2046 bytes_transferred = 0 |
| 2047 expect_gzip = (src_obj_metadata.contentEncoding and |
| 2048 src_obj_metadata.contentEncoding.lower().endswith('gzip')) |
| 2049 for cp_result in cp_results: |
| 2050 bytes_transferred += cp_result.bytes_transferred |
| 2051 server_gzip = (cp_result.server_encoding and |
| 2052 cp_result.server_encoding.lower().endswith('gzip')) |
| 2053 # If the server gzipped any components on the fly, we will have no chance of |
| 2054 # properly reconstructing the file. |
| 2055 if server_gzip and not expect_gzip: |
| 2056 raise CommandException( |
| 2057 'Download of %s failed because the server sent back data with an ' |
| 2058 'unexpected encoding.' % dst_url.object_name) |
| 2059 |
| 2060 return bytes_transferred, crc32c |
| 2061 |
| 2062 |
| 2063 def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url, |
| 2064 download_file_name, gsutil_api, logger, |
| 2065 digesters, component_num=None, start_byte=0, |
| 2066 end_byte=None): |
| 2067 """Downloads an object to a local file using the resumable strategy. |
| 2068 |
| 2069 Args: |
| 2070 src_url: Source CloudUrl. |
| 2071 src_obj_metadata: Metadata from the source object. |
| 2072 dst_url: Destination FileUrl. |
| 2073 download_file_name: Temporary file name to be used for download. |
| 2074 gsutil_api: gsutil Cloud API instance to use for the download. |
| 2075 logger: for outputting log messages. |
| 2076 digesters: Digesters corresponding to the hash algorithms that will be used |
| 2077 for validation. |
| 2078 component_num: Which component of a sliced download this call is for, or |
| 2079 None if this is not a sliced download. |
| 2080 start_byte: The first byte of a byte range for a sliced download. |
| 2081 end_byte: The last byte of a byte range for a sliced download. |
| 2082 |
| 2083 Returns: |
| 2084 (bytes_transferred, server_encoding) |
| 2085 bytes_transferred: Number of bytes transferred from server this call. |
| 2086 server_encoding: Content-encoding string if it was detected that the server |
| 2087 sent encoded bytes during transfer, None otherwise. |
| 2088 """ |
| 2089 if end_byte is None: |
| 2090 end_byte = src_obj_metadata.size - 1 |
| 2091 download_size = end_byte - start_byte + 1 |
| 2092 |
| 2093 is_sliced = component_num is not None |
| 2094 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) |
1728 server_encoding = None | 2095 server_encoding = None |
1729 download_complete = False | 2096 |
1730 download_strategy = _SelectDownloadStrategy(dst_url) | 2097 # Used for logging |
1731 download_start_point = 0 | 2098 download_name = dst_url.object_name |
1732 # This is used for resuming downloads, but also for passing the mediaLink | 2099 if is_sliced: |
1733 # and size into the download for new downloads so that we can avoid | 2100 download_name += ' component %d' % component_num |
1734 # making an extra HTTP call. | 2101 |
1735 serialization_data = None | |
1736 serialization_dict = GetDownloadSerializationDict(src_obj_metadata) | |
1737 open_files = [] | |
1738 try: | 2102 try: |
1739 if download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: | 2103 fp = open(download_file_name, 'r+b') |
1740 fp = open(download_file_name, 'wb') | 2104 fp.seek(start_byte) |
1741 elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE: | 2105 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) |
1742 # If this is a resumable download, we need to open the file for append and | 2106 existing_file_size = GetFileSize(fp) |
1743 # manage a tracker file. | 2107 |
1744 if open_files_map.get(download_file_name, False): | 2108 tracker_file_name, download_start_byte = ( |
1745 # Ensure another process/thread is not already writing to this file. | 2109 ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger, |
1746 raise FileConcurrencySkipError | 2110 api_selector, start_byte, |
1747 open_files.append(download_file_name) | 2111 existing_file_size, component_num)) |
1748 open_files_map[download_file_name] = True | 2112 |
1749 fp = open(download_file_name, 'ab') | 2113 if download_start_byte < start_byte or download_start_byte > end_byte + 1: |
1750 | 2114 DeleteTrackerFile(tracker_file_name) |
1751 resuming = ReadOrCreateDownloadTrackerFile( | 2115 raise CommandException( |
1752 src_obj_metadata, dst_url, api_selector) | 2116 'Resumable download start point for %s is not in the correct byte ' |
1753 if resuming: | 2117 'range. Deleting tracker file, so if you re-try this download it ' |
1754 # Find out how far along we are so we can request the appropriate | 2118 'will start from scratch' % download_name) |
1755 # remaining range of the object. | 2119 |
1756 existing_file_size = GetFileSize(fp, position_to_eof=True) | 2120 download_complete = (download_start_byte == start_byte + download_size) |
1757 if existing_file_size > src_obj_metadata.size: | 2121 resuming = (download_start_byte != start_byte) and not download_complete |
1758 DeleteTrackerFile(GetTrackerFilePath( | 2122 if resuming: |
1759 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | 2123 logger.info('Resuming download for %s', download_name) |
1760 raise CommandException( | 2124 elif download_complete: |
1761 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' | 2125 logger.info( |
1762 'if you re-try this download it will start from scratch' % | 2126 'Download already complete for %s, skipping download but ' |
1763 (download_file_name, existing_file_size, src_url.object_name, | 2127 'will run integrity checks.', download_name) |
1764 src_obj_metadata.size)) | 2128 |
1765 else: | 2129 # This is used for resuming downloads, but also for passing the mediaLink |
1766 if existing_file_size == src_obj_metadata.size: | 2130 # and size into the download for new downloads so that we can avoid |
1767 logger.info( | 2131 # making an extra HTTP call. |
1768 'Download already complete for file %s, skipping download but ' | 2132 serialization_data = GetDownloadSerializationData( |
1769 'will run integrity checks.', download_file_name) | 2133 src_obj_metadata, progress=download_start_byte) |
1770 download_complete = True | 2134 |
1771 else: | 2135 if resuming or download_complete: |
1772 download_start_point = existing_file_size | 2136 # Catch up our digester with the hash data. |
1773 serialization_dict['progress'] = download_start_point | 2137 bytes_digested = 0 |
1774 logger.info('Resuming download for %s', src_url.url_string) | 2138 total_bytes_to_digest = download_start_byte - start_byte |
1775 # Catch up our digester with the hash data. | 2139 hash_callback = ProgressCallbackWithBackoff( |
1776 if existing_file_size > TEN_MIB: | 2140 total_bytes_to_digest, |
1777 for alg_name in digesters: | 2141 FileProgressCallbackHandler( |
1778 logger.info( | 2142 ConstructAnnounceText('Hashing', |
1779 'Catching up %s for %s', alg_name, download_file_name) | 2143 dst_url.url_string), logger).call) |
1780 with open(download_file_name, 'rb') as hash_fp: | 2144 |
1781 while True: | 2145 while bytes_digested < total_bytes_to_digest: |
1782 data = hash_fp.read(DEFAULT_FILE_BUFFER_SIZE) | 2146 bytes_to_read = min(DEFAULT_FILE_BUFFER_SIZE, |
1783 if not data: | 2147 total_bytes_to_digest - bytes_digested) |
1784 break | 2148 data = fp.read(bytes_to_read) |
1785 for alg_name in digesters: | 2149 bytes_digested += bytes_to_read |
1786 digesters[alg_name].update(data) | 2150 for alg_name in digesters: |
1787 else: | 2151 digesters[alg_name].update(data) |
1788 # Starting a new download, blow away whatever is already there. | 2152 hash_callback.Progress(len(data)) |
1789 fp.truncate(0) | 2153 |
1790 fp.seek(0) | 2154 elif not is_sliced: |
1791 | 2155 # Delete file contents and start entire object download from scratch. |
1792 else: | 2156 fp.truncate(0) |
1793 raise CommandException('Invalid download strategy %s chosen for' | 2157 existing_file_size = 0 |
1794 'file %s' % (download_strategy, fp.name)) | |
1795 | |
1796 if not dst_url.IsStream(): | |
1797 serialization_data = json.dumps(serialization_dict) | |
1798 | 2158 |
1799 progress_callback = FileProgressCallbackHandler( | 2159 progress_callback = FileProgressCallbackHandler( |
1800 ConstructAnnounceText('Downloading', dst_url.url_string), | 2160 ConstructAnnounceText('Downloading', dst_url.url_string), logger, |
1801 logger).call | 2161 start_byte, download_size).call |
| 2162 |
1802 if global_copy_helper_opts.test_callback_file: | 2163 if global_copy_helper_opts.test_callback_file: |
1803 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: | 2164 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: |
1804 progress_callback = pickle.loads(test_fp.read()).call | 2165 progress_callback = pickle.loads(test_fp.read()).call |
1805 | 2166 |
1806 start_time = time.time() | 2167 if is_sliced and src_obj_metadata.size >= ResumableThreshold(): |
| 2168 fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata, |
| 2169 start_byte, end_byte) |
| 2170 |
1807 # TODO: With gzip encoding (which may occur on-the-fly and not be part of | 2171 # TODO: With gzip encoding (which may occur on-the-fly and not be part of |
1808 # the object's metadata), when we request a range to resume, it's possible | 2172 # the object's metadata), when we request a range to resume, it's possible |
1809 # that the server will just resend the entire object, which means our | 2173 # that the server will just resend the entire object, which means our |
1810 # caught-up hash will be incorrect. We recalculate the hash on | 2174 # caught-up hash will be incorrect. We recalculate the hash on |
1811 # the local file in the case of a failed gzip hash anyway, but it would | 2175 # the local file in the case of a failed gzip hash anyway, but it would |
1812 # be better if we actively detected this case. | 2176 # be better if we actively detected this case. |
1813 if not download_complete: | 2177 if not download_complete: |
| 2178 fp.seek(download_start_byte) |
1814 server_encoding = gsutil_api.GetObjectMedia( | 2179 server_encoding = gsutil_api.GetObjectMedia( |
1815 src_url.bucket_name, src_url.object_name, fp, | 2180 src_url.bucket_name, src_url.object_name, fp, |
1816 start_byte=download_start_point, generation=src_url.generation, | 2181 start_byte=download_start_byte, end_byte=end_byte, |
1817 object_size=src_obj_metadata.size, | 2182 generation=src_url.generation, object_size=src_obj_metadata.size, |
1818 download_strategy=download_strategy, provider=src_url.scheme, | 2183 download_strategy=CloudApi.DownloadStrategy.RESUMABLE, |
1819 serialization_data=serialization_data, digesters=digesters, | 2184 provider=src_url.scheme, serialization_data=serialization_data, |
1820 progress_callback=progress_callback) | 2185 digesters=digesters, progress_callback=progress_callback) |
1821 | |
1822 end_time = time.time() | |
1823 | |
1824 # If a custom test method is defined, call it here. For the copy command, | |
1825 # test methods are expected to take one argument: an open file pointer, | |
1826 # and are used to perturb the open file during download to exercise | |
1827 # download error detection. | |
1828 if test_method: | |
1829 test_method(fp) | |
1830 | 2186 |
1831 except ResumableDownloadException as e: | 2187 except ResumableDownloadException as e: |
1832 logger.warning('Caught ResumableDownloadException (%s) for file %s.', | 2188 logger.warning('Caught ResumableDownloadException (%s) for download of %s.', |
1833 e.reason, file_name) | 2189 e.reason, download_name) |
1834 raise | 2190 raise |
1835 | |
1836 finally: | 2191 finally: |
1837 if fp: | 2192 if fp: |
1838 fp.close() | 2193 fp.close() |
1839 for file_name in open_files: | |
1840 open_files_map.delete(file_name) | |
1841 | 2194 |
1842 # If we decompressed a content-encoding gzip file on the fly, this may not | 2195 bytes_transferred = end_byte - download_start_byte + 1 |
1843 # be accurate, but it is the best we can do without going deep into the | 2196 return bytes_transferred, server_encoding |
1844 # underlying HTTP libraries. Note that this value is only used for | 2197 |
1845 # reporting in log messages; inaccuracy doesn't impact the integrity of the | 2198 |
1846 # download. | 2199 def _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url, |
1847 bytes_transferred = src_obj_metadata.size - download_start_point | 2200 download_file_name, gsutil_api, logger, |
| 2201 digesters): |
| 2202 """Downloads an object to a local file using the non-resumable strategy. |
| 2203 |
| 2204 Args: |
| 2205 src_url: Source CloudUrl. |
| 2206 src_obj_metadata: Metadata from the source object. |
| 2207 dst_url: Destination FileUrl. |
| 2208 download_file_name: Temporary file name to be used for download. |
| 2209 gsutil_api: gsutil Cloud API instance to use for the download. |
| 2210 logger: for outputting log messages. |
| 2211 digesters: Digesters corresponding to the hash algorithms that will be used |
| 2212 for validation. |
| 2213 Returns: |
| 2214 (bytes_transferred, server_encoding) |
| 2215 bytes_transferred: Number of bytes transferred from server this call. |
| 2216 server_encoding: Content-encoding string if it was detected that the server |
| 2217 sent encoded bytes during transfer, None otherwise. |
| 2218 """ |
| 2219 try: |
| 2220 fp = open(download_file_name, 'w') |
| 2221 |
| 2222 # This is used to pass the mediaLink and the size into the download so that |
| 2223 # we can avoid making an extra HTTP call. |
| 2224 serialization_data = GetDownloadSerializationData(src_obj_metadata) |
| 2225 |
| 2226 progress_callback = FileProgressCallbackHandler( |
| 2227 ConstructAnnounceText('Downloading', dst_url.url_string), logger).call |
| 2228 |
| 2229 if global_copy_helper_opts.test_callback_file: |
| 2230 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: |
| 2231 progress_callback = pickle.loads(test_fp.read()).call |
| 2232 |
| 2233 server_encoding = gsutil_api.GetObjectMedia( |
| 2234 src_url.bucket_name, src_url.object_name, fp, |
| 2235 generation=src_url.generation, object_size=src_obj_metadata.size, |
| 2236 download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, |
| 2237 provider=src_url.scheme, serialization_data=serialization_data, |
| 2238 digesters=digesters, progress_callback=progress_callback) |
| 2239 finally: |
| 2240 if fp: |
| 2241 fp.close() |
| 2242 |
| 2243 return src_obj_metadata.size, server_encoding |
| 2244 |
| 2245 |
| 2246 def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
| 2247 gsutil_api, logger, command_obj, |
| 2248 copy_exception_handler, allow_splitting=True): |
| 2249 """Downloads an object to a local file. |
| 2250 |
| 2251 Args: |
| 2252 src_url: Source CloudUrl. |
| 2253 src_obj_metadata: Metadata from the source object. |
| 2254 dst_url: Destination FileUrl. |
| 2255 gsutil_api: gsutil Cloud API instance to use for the download. |
| 2256 logger: for outputting log messages. |
| 2257 command_obj: command object for use in Apply in sliced downloads. |
| 2258 copy_exception_handler: For handling copy exceptions during Apply. |
| 2259 allow_splitting: Whether or not to allow sliced download. |
| 2260 Returns: |
| 2261 (elapsed_time, bytes_transferred, dst_url, md5), where time elapsed |
| 2262 excludes initial GET. |
| 2263 |
| 2264 Raises: |
| 2265 FileConcurrencySkipError: if this download is already in progress. |
| 2266 CommandException: if other errors encountered. |
| 2267 """ |
| 2268 global open_files_map, open_files_lock |
| 2269 if dst_url.object_name.endswith(dst_url.delim): |
| 2270 logger.warn('\n'.join(textwrap.wrap( |
| 2271 'Skipping attempt to download to filename ending with slash (%s). This ' |
| 2272 'typically happens when using gsutil to download from a subdirectory ' |
| 2273 'created by the Cloud Console (https://cloud.google.com/console)' |
| 2274 % dst_url.object_name))) |
| 2275 return (0, 0, dst_url, '') |
| 2276 |
| 2277 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) |
| 2278 download_strategy = _SelectDownloadStrategy(dst_url) |
| 2279 sliced_download = _ShouldDoSlicedDownload( |
| 2280 download_strategy, src_obj_metadata, allow_splitting, logger) |
| 2281 |
| 2282 download_file_name, need_to_unzip = _GetDownloadFile( |
| 2283 dst_url, src_obj_metadata, logger) |
| 2284 |
| 2285 # Ensure another process/thread is not already writing to this file. |
| 2286 with open_files_lock: |
| 2287 if open_files_map.get(download_file_name, False): |
| 2288 raise FileConcurrencySkipError |
| 2289 open_files_map[download_file_name] = True |
| 2290 |
| 2291 # Set up hash digesters. |
| 2292 consider_md5 = src_obj_metadata.md5Hash and not sliced_download |
| 2293 hash_algs = GetDownloadHashAlgs(logger, consider_md5=consider_md5, |
| 2294 consider_crc32c=src_obj_metadata.crc32c) |
| 2295 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) |
| 2296 |
| 2297 # Tracks whether the server used a gzip encoding. |
| 2298 server_encoding = None |
| 2299 download_complete = (src_obj_metadata.size == 0) |
| 2300 bytes_transferred = 0 |
| 2301 |
| 2302 start_time = time.time() |
| 2303 if not download_complete: |
| 2304 if sliced_download: |
| 2305 (bytes_transferred, crc32c) = ( |
| 2306 _DoSlicedDownload(src_url, src_obj_metadata, dst_url, |
| 2307 download_file_name, command_obj, logger, |
| 2308 copy_exception_handler, api_selector)) |
| 2309 if 'crc32c' in digesters: |
| 2310 digesters['crc32c'].crcValue = crc32c |
| 2311 elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: |
| 2312 (bytes_transferred, server_encoding) = ( |
| 2313 _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url, |
| 2314 download_file_name, gsutil_api, |
| 2315 logger, digesters)) |
| 2316 elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE: |
| 2317 (bytes_transferred, server_encoding) = ( |
| 2318 _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url, |
| 2319 download_file_name, gsutil_api, logger, |
| 2320 digesters)) |
| 2321 else: |
| 2322 raise CommandException('Invalid download strategy %s chosen for' |
| 2323 'file %s' % (download_strategy, |
| 2324 download_file_name)) |
| 2325 end_time = time.time() |
| 2326 |
1848 server_gzip = server_encoding and server_encoding.lower().endswith('gzip') | 2327 server_gzip = server_encoding and server_encoding.lower().endswith('gzip') |
1849 local_md5 = _ValidateDownloadHashes(logger, src_url, src_obj_metadata, | 2328 local_md5 = _ValidateAndCompleteDownload( |
1850 dst_url, need_to_unzip, server_gzip, | 2329 logger, src_url, src_obj_metadata, dst_url, need_to_unzip, server_gzip, |
1851 digesters, hash_algs, api_selector, | 2330 digesters, hash_algs, download_file_name, api_selector, bytes_transferred) |
1852 bytes_transferred) | 2331 |
| 2332 with open_files_lock: |
| 2333 open_files_map.delete(download_file_name) |
1853 | 2334 |
1854 return (end_time - start_time, bytes_transferred, dst_url, local_md5) | 2335 return (end_time - start_time, bytes_transferred, dst_url, local_md5) |
1855 | 2336 |
1856 | 2337 |
1857 def _GetDownloadZipFileName(file_name): | 2338 def _GetDownloadTempZipFileName(dst_url): |
1858 """Returns the file name for a temporarily compressed downloaded file.""" | 2339 """Returns temporary file name for a temporarily compressed download.""" |
1859 return '%s_.gztmp' % file_name | 2340 return '%s_.gztmp' % dst_url.object_name |
1860 | 2341 |
1861 | 2342 |
1862 def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, | 2343 def _GetDownloadTempFileName(dst_url): |
1863 need_to_unzip, server_gzip, digesters, hash_algs, | 2344 """Returns temporary download file name for uncompressed downloads.""" |
1864 api_selector, bytes_transferred): | 2345 return '%s_.gstmp' % dst_url.object_name |
1865 """Validates a downloaded file's integrity. | 2346 |
| 2347 |
| 2348 def _ValidateAndCompleteDownload(logger, src_url, src_obj_metadata, dst_url, |
| 2349 need_to_unzip, server_gzip, digesters, |
| 2350 hash_algs, download_file_name, |
| 2351 api_selector, bytes_transferred): |
| 2352 """Validates and performs necessary operations on a downloaded file. |
| 2353 |
| 2354 Validates the integrity of the downloaded file using hash_algs. If the file |
| 2355 was compressed (temporarily), the file will be decompressed. Then, if the |
| 2356 integrity of the file was successfully validated, the file will be moved |
| 2357 from its temporary download location to its permanent location on disk. |
1866 | 2358 |
1867 Args: | 2359 Args: |
1868 logger: For outputting log messages. | 2360 logger: For outputting log messages. |
1869 src_url: StorageUrl for the source object. | 2361 src_url: StorageUrl for the source object. |
1870 src_obj_metadata: Metadata for the source object, potentially containing | 2362 src_obj_metadata: Metadata for the source object, potentially containing |
1871 hash values. | 2363 hash values. |
1872 dst_url: StorageUrl describing the destination file. | 2364 dst_url: StorageUrl describing the destination file. |
1873 need_to_unzip: If true, a temporary zip file was used and must be | 2365 need_to_unzip: If true, a temporary zip file was used and must be |
1874 uncompressed as part of validation. | 2366 uncompressed as part of validation. |
1875 server_gzip: If true, the server gzipped the bytes (regardless of whether | 2367 server_gzip: If true, the server gzipped the bytes (regardless of whether |
1876 the object metadata claimed it was gzipped). | 2368 the object metadata claimed it was gzipped). |
1877 digesters: dict of {string, hash digester} that contains up-to-date digests | 2369 digesters: dict of {string, hash digester} that contains up-to-date digests |
1878 computed during the download. If a digester for a particular | 2370 computed during the download. If a digester for a particular |
1879 algorithm is None, an up-to-date digest is not available and the | 2371 algorithm is None, an up-to-date digest is not available and the |
1880 hash must be recomputed from the local file. | 2372 hash must be recomputed from the local file. |
1881 hash_algs: dict of {string, hash algorithm} that can be used if digesters | 2373 hash_algs: dict of {string, hash algorithm} that can be used if digesters |
1882 don't have up-to-date digests. | 2374 don't have up-to-date digests. |
| 2375 download_file_name: Temporary file name that was used for download. |
1883 api_selector: The Cloud API implementation used (used tracker file naming). | 2376 api_selector: The Cloud API implementation used (used tracker file naming). |
1884 bytes_transferred: Number of bytes downloaded (used for logging). | 2377 bytes_transferred: Number of bytes downloaded (used for logging). |
1885 | 2378 |
1886 Returns: | 2379 Returns: |
1887 An MD5 of the local file, if one was calculated as part of the integrity | 2380 An MD5 of the local file, if one was calculated as part of the integrity |
1888 check. | 2381 check. |
1889 """ | 2382 """ |
1890 file_name = dst_url.object_name | 2383 final_file_name = dst_url.object_name |
1891 download_file_name = (_GetDownloadZipFileName(file_name) if need_to_unzip else | 2384 file_name = download_file_name |
1892 file_name) | |
1893 digesters_succeeded = True | 2385 digesters_succeeded = True |
| 2386 |
1894 for alg in digesters: | 2387 for alg in digesters: |
1895 # If we get a digester with a None algorithm, the underlying | 2388 # If we get a digester with a None algorithm, the underlying |
1896 # implementation failed to calculate a digest, so we will need to | 2389 # implementation failed to calculate a digest, so we will need to |
1897 # calculate one from scratch. | 2390 # calculate one from scratch. |
1898 if not digesters[alg]: | 2391 if not digesters[alg]: |
1899 digesters_succeeded = False | 2392 digesters_succeeded = False |
1900 break | 2393 break |
1901 | 2394 |
1902 if digesters_succeeded: | 2395 if digesters_succeeded: |
1903 local_hashes = _CreateDigestsFromDigesters(digesters) | 2396 local_hashes = _CreateDigestsFromDigesters(digesters) |
1904 else: | 2397 else: |
1905 local_hashes = _CreateDigestsFromLocalFile( | 2398 local_hashes = _CreateDigestsFromLocalFile( |
1906 logger, hash_algs, download_file_name, src_obj_metadata) | 2399 logger, hash_algs, file_name, final_file_name, src_obj_metadata) |
1907 | 2400 |
1908 digest_verified = True | 2401 digest_verified = True |
1909 hash_invalid_exception = None | 2402 hash_invalid_exception = None |
1910 try: | 2403 try: |
1911 _CheckHashes(logger, src_url, src_obj_metadata, download_file_name, | 2404 _CheckHashes(logger, src_url, src_obj_metadata, final_file_name, |
1912 local_hashes) | 2405 local_hashes) |
1913 DeleteTrackerFile(GetTrackerFilePath( | 2406 DeleteDownloadTrackerFiles(dst_url, api_selector) |
1914 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | |
1915 except HashMismatchException, e: | 2407 except HashMismatchException, e: |
1916 # If an non-gzipped object gets sent with gzip content encoding, the hash | 2408 # If an non-gzipped object gets sent with gzip content encoding, the hash |
1917 # we calculate will match the gzipped bytes, not the original object. Thus, | 2409 # we calculate will match the gzipped bytes, not the original object. Thus, |
1918 # we'll need to calculate and check it after unzipping. | 2410 # we'll need to calculate and check it after unzipping. |
1919 if server_gzip: | 2411 if server_gzip: |
1920 logger.debug( | 2412 logger.debug( |
1921 'Hash did not match but server gzipped the content, will ' | 2413 'Hash did not match but server gzipped the content, will ' |
1922 'recalculate.') | 2414 'recalculate.') |
1923 digest_verified = False | 2415 digest_verified = False |
1924 elif api_selector == ApiSelector.XML: | 2416 elif api_selector == ApiSelector.XML: |
1925 logger.debug( | 2417 logger.debug( |
1926 'Hash did not match but server may have gzipped the content, will ' | 2418 'Hash did not match but server may have gzipped the content, will ' |
1927 'recalculate.') | 2419 'recalculate.') |
1928 # Save off the exception in case this isn't a gzipped file. | 2420 # Save off the exception in case this isn't a gzipped file. |
1929 hash_invalid_exception = e | 2421 hash_invalid_exception = e |
1930 digest_verified = False | 2422 digest_verified = False |
1931 else: | 2423 else: |
1932 DeleteTrackerFile(GetTrackerFilePath( | 2424 DeleteDownloadTrackerFiles(dst_url, api_selector) |
1933 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | |
1934 if _RENAME_ON_HASH_MISMATCH: | 2425 if _RENAME_ON_HASH_MISMATCH: |
1935 os.rename(download_file_name, | 2426 os.rename(file_name, |
1936 download_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) | 2427 final_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) |
1937 else: | 2428 else: |
1938 os.unlink(download_file_name) | 2429 os.unlink(file_name) |
1939 raise | 2430 raise |
1940 | 2431 |
1941 if server_gzip and not need_to_unzip: | |
1942 # Server compressed bytes on-the-fly, thus we need to rename and decompress. | |
1943 # We can't decompress on-the-fly because prior to Python 3.2 the gzip | |
1944 # module makes a bunch of seek calls on the stream. | |
1945 download_file_name = _GetDownloadZipFileName(file_name) | |
1946 os.rename(file_name, download_file_name) | |
1947 | |
1948 if need_to_unzip or server_gzip: | 2432 if need_to_unzip or server_gzip: |
1949 # Log that we're uncompressing if the file is big enough that | 2433 # Log that we're uncompressing if the file is big enough that |
1950 # decompressing would make it look like the transfer "stalled" at the end. | 2434 # decompressing would make it look like the transfer "stalled" at the end. |
1951 if bytes_transferred > TEN_MIB: | 2435 if bytes_transferred > TEN_MIB: |
1952 logger.info( | 2436 logger.info( |
1953 'Uncompressing downloaded tmp file to %s...', file_name) | 2437 'Uncompressing temporarily gzipped file to %s...', final_file_name) |
1954 | 2438 |
1955 # Downloaded gzipped file to a filename w/o .gz extension, so unzip. | |
1956 gzip_fp = None | 2439 gzip_fp = None |
1957 try: | 2440 try: |
1958 gzip_fp = gzip.open(download_file_name, 'rb') | 2441 # Downloaded temporarily gzipped file, unzip to file without '_.gztmp' |
1959 with open(file_name, 'wb') as f_out: | 2442 # suffix. |
| 2443 gzip_fp = gzip.open(file_name, 'rb') |
| 2444 with open(final_file_name, 'wb') as f_out: |
1960 data = gzip_fp.read(GZIP_CHUNK_SIZE) | 2445 data = gzip_fp.read(GZIP_CHUNK_SIZE) |
1961 while data: | 2446 while data: |
1962 f_out.write(data) | 2447 f_out.write(data) |
1963 data = gzip_fp.read(GZIP_CHUNK_SIZE) | 2448 data = gzip_fp.read(GZIP_CHUNK_SIZE) |
1964 except IOError, e: | 2449 except IOError, e: |
1965 # In the XML case where we don't know if the file was gzipped, raise | 2450 # In the XML case where we don't know if the file was gzipped, raise |
1966 # the original hash exception if we find that it wasn't. | 2451 # the original hash exception if we find that it wasn't. |
1967 if 'Not a gzipped file' in str(e) and hash_invalid_exception: | 2452 if 'Not a gzipped file' in str(e) and hash_invalid_exception: |
1968 # Linter improperly thinks we're raising None despite the above check. | 2453 # Linter improperly thinks we're raising None despite the above check. |
1969 # pylint: disable=raising-bad-type | 2454 # pylint: disable=raising-bad-type |
1970 raise hash_invalid_exception | 2455 raise hash_invalid_exception |
1971 finally: | 2456 finally: |
1972 if gzip_fp: | 2457 if gzip_fp: |
1973 gzip_fp.close() | 2458 gzip_fp.close() |
1974 | 2459 |
1975 os.unlink(download_file_name) | 2460 os.unlink(file_name) |
| 2461 file_name = final_file_name |
1976 | 2462 |
1977 if not digest_verified: | 2463 if not digest_verified: |
1978 try: | 2464 try: |
1979 # Recalculate hashes on the unzipped local file. | 2465 # Recalculate hashes on the unzipped local file. |
1980 local_hashes = _CreateDigestsFromLocalFile(logger, hash_algs, file_name, | 2466 local_hashes = _CreateDigestsFromLocalFile( |
1981 src_obj_metadata) | 2467 logger, hash_algs, file_name, final_file_name, src_obj_metadata) |
1982 _CheckHashes(logger, src_url, src_obj_metadata, file_name, local_hashes) | 2468 _CheckHashes(logger, src_url, src_obj_metadata, final_file_name, |
1983 DeleteTrackerFile(GetTrackerFilePath( | 2469 local_hashes) |
1984 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | 2470 DeleteDownloadTrackerFiles(dst_url, api_selector) |
1985 except HashMismatchException: | 2471 except HashMismatchException: |
1986 DeleteTrackerFile(GetTrackerFilePath( | 2472 DeleteDownloadTrackerFiles(dst_url, api_selector) |
1987 dst_url, TrackerFileType.DOWNLOAD, api_selector)) | |
1988 if _RENAME_ON_HASH_MISMATCH: | 2473 if _RENAME_ON_HASH_MISMATCH: |
1989 os.rename(file_name, | 2474 os.rename(file_name, |
1990 file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) | 2475 file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) |
1991 else: | 2476 else: |
1992 os.unlink(file_name) | 2477 os.unlink(file_name) |
1993 raise | 2478 raise |
1994 | 2479 |
| 2480 if file_name != final_file_name: |
| 2481 # Data is still in a temporary file, so move it to a permanent location. |
| 2482 if os.path.exists(final_file_name): |
| 2483 os.unlink(final_file_name) |
| 2484 os.rename(file_name, |
| 2485 final_file_name) |
| 2486 |
1995 if 'md5' in local_hashes: | 2487 if 'md5' in local_hashes: |
1996 return local_hashes['md5'] | 2488 return local_hashes['md5'] |
1997 | 2489 |
1998 | 2490 |
1999 def _CopyFileToFile(src_url, dst_url): | 2491 def _CopyFileToFile(src_url, dst_url): |
2000 """Copies a local file to a local file. | 2492 """Copies a local file to a local file. |
2001 | 2493 |
2002 Args: | 2494 Args: |
2003 src_url: Source FileUrl. | 2495 src_url: Source FileUrl. |
2004 dst_url: Destination FileUrl. | 2496 dst_url: Destination FileUrl. |
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2116 result_url, uploaded_object.generation) | 2608 result_url, uploaded_object.generation) |
2117 | 2609 |
2118 return (end_time - start_time, src_obj_metadata.size, result_url, | 2610 return (end_time - start_time, src_obj_metadata.size, result_url, |
2119 uploaded_object.md5Hash) | 2611 uploaded_object.md5Hash) |
2120 | 2612 |
2121 | 2613 |
2122 # pylint: disable=undefined-variable | 2614 # pylint: disable=undefined-variable |
2123 # pylint: disable=too-many-statements | 2615 # pylint: disable=too-many-statements |
2124 def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, | 2616 def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, |
2125 copy_exception_handler, allow_splitting=True, | 2617 copy_exception_handler, allow_splitting=True, |
2126 headers=None, manifest=None, gzip_exts=None, test_method=None): | 2618 headers=None, manifest=None, gzip_exts=None): |
2127 """Performs copy from src_url to dst_url, handling various special cases. | 2619 """Performs copy from src_url to dst_url, handling various special cases. |
2128 | 2620 |
2129 Args: | 2621 Args: |
2130 logger: for outputting log messages. | 2622 logger: for outputting log messages. |
2131 src_url: Source StorageUrl. | 2623 src_url: Source StorageUrl. |
2132 dst_url: Destination StorageUrl. | 2624 dst_url: Destination StorageUrl. |
2133 gsutil_api: gsutil Cloud API instance to use for the copy. | 2625 gsutil_api: gsutil Cloud API instance to use for the copy. |
2134 command_obj: command object for use in Apply in parallel composite uploads. | 2626 command_obj: command object for use in Apply in parallel composite uploads |
| 2627 and sliced object downloads. |
2135 copy_exception_handler: for handling copy exceptions during Apply. | 2628 copy_exception_handler: for handling copy exceptions during Apply. |
2136 allow_splitting: Whether to allow the file to be split into component | 2629 allow_splitting: Whether to allow the file to be split into component |
2137 pieces for an parallel composite upload. | 2630 pieces for an parallel composite upload or download. |
2138 headers: optional headers to use for the copy operation. | 2631 headers: optional headers to use for the copy operation. |
2139 manifest: optional manifest for tracking copy operations. | 2632 manifest: optional manifest for tracking copy operations. |
2140 gzip_exts: List of file extensions to gzip for uploads, if any. | 2633 gzip_exts: List of file extensions to gzip for uploads, if any. |
2141 test_method: optional test method for modifying files during unit tests. | |
2142 | 2634 |
2143 Returns: | 2635 Returns: |
2144 (elapsed_time, bytes_transferred, version-specific dst_url) excluding | 2636 (elapsed_time, bytes_transferred, version-specific dst_url) excluding |
2145 overhead like initial GET. | 2637 overhead like initial GET. |
2146 | 2638 |
2147 Raises: | 2639 Raises: |
2148 ItemExistsError: if no clobber flag is specified and the destination | 2640 ItemExistsError: if no clobber flag is specified and the destination |
2149 object already exists. | 2641 object already exists. |
2150 SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified | 2642 SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified |
2151 and the source is an unsupported type. | 2643 and the source is an unsupported type. |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2187 if global_copy_helper_opts.preserve_acl: | 2679 if global_copy_helper_opts.preserve_acl: |
2188 src_obj_fields.append('acl') | 2680 src_obj_fields.append('acl') |
2189 if (src_url.scheme == dst_url.scheme | 2681 if (src_url.scheme == dst_url.scheme |
2190 and not global_copy_helper_opts.daisy_chain): | 2682 and not global_copy_helper_opts.daisy_chain): |
2191 copy_in_the_cloud = True | 2683 copy_in_the_cloud = True |
2192 else: | 2684 else: |
2193 copy_in_the_cloud = False | 2685 copy_in_the_cloud = False |
2194 else: | 2686 else: |
2195 # Just get the fields needed to validate the download. | 2687 # Just get the fields needed to validate the download. |
2196 src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag', | 2688 src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag', |
2197 'mediaLink', 'md5Hash', 'size'] | 2689 'mediaLink', 'md5Hash', 'size', 'generation'] |
2198 | 2690 |
2199 if (src_url.scheme == 's3' and | 2691 if (src_url.scheme == 's3' and |
2200 global_copy_helper_opts.skip_unsupported_objects): | 2692 global_copy_helper_opts.skip_unsupported_objects): |
2201 src_obj_fields.append('storageClass') | 2693 src_obj_fields.append('storageClass') |
2202 | 2694 |
2203 try: | 2695 try: |
2204 src_generation = GenerationFromUrlAndString(src_url, src_url.generation) | 2696 src_generation = GenerationFromUrlAndString(src_url, src_url.generation) |
2205 src_obj_metadata = gsutil_api.GetObjectMetadata( | 2697 src_obj_metadata = gsutil_api.GetObjectMetadata( |
2206 src_url.bucket_name, src_url.object_name, | 2698 src_url.bucket_name, src_url.object_name, |
2207 generation=src_generation, provider=src_url.scheme, | 2699 generation=src_generation, provider=src_url.scheme, |
(...skipping 16 matching lines...) Expand all Loading... |
2224 # dst_url will be verified in _CopyObjToObjDaisyChainMode if it | 2716 # dst_url will be verified in _CopyObjToObjDaisyChainMode if it |
2225 # is not s3 (and thus differs from src_url). | 2717 # is not s3 (and thus differs from src_url). |
2226 if src_url.scheme == 's3': | 2718 if src_url.scheme == 's3': |
2227 acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata) | 2719 acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata) |
2228 if acl_text: | 2720 if acl_text: |
2229 AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text) | 2721 AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text) |
2230 else: | 2722 else: |
2231 try: | 2723 try: |
2232 src_obj_filestream = GetStreamFromFileUrl(src_url) | 2724 src_obj_filestream = GetStreamFromFileUrl(src_url) |
2233 except Exception, e: # pylint: disable=broad-except | 2725 except Exception, e: # pylint: disable=broad-except |
2234 raise CommandException('Error opening file "%s": %s.' % (src_url, | 2726 if command_obj.continue_on_error: |
2235 e.message)) | 2727 message = 'Error copying %s: %s' % (src_url, str(e)) |
| 2728 command_obj.op_failure_count += 1 |
| 2729 logger.error(message) |
| 2730 return |
| 2731 else: |
| 2732 raise CommandException('Error opening file "%s": %s.' % (src_url, |
| 2733 e.message)) |
2236 if src_url.IsStream(): | 2734 if src_url.IsStream(): |
2237 src_obj_size = None | 2735 src_obj_size = None |
2238 else: | 2736 else: |
2239 src_obj_size = os.path.getsize(src_url.object_name) | 2737 src_obj_size = os.path.getsize(src_url.object_name) |
2240 | 2738 |
2241 if global_copy_helper_opts.use_manifest: | 2739 if global_copy_helper_opts.use_manifest: |
2242 # Set the source size in the manifest. | 2740 # Set the source size in the manifest. |
2243 manifest.Set(src_url.url_string, 'size', src_obj_size) | 2741 manifest.Set(src_url.url_string, 'size', src_obj_size) |
2244 | 2742 |
2245 if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE | 2743 if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2301 _SetContentTypeFromFile(src_url, dst_obj_metadata) | 2799 _SetContentTypeFromFile(src_url, dst_obj_metadata) |
2302 else: | 2800 else: |
2303 # Files don't have Cloud API metadata. | 2801 # Files don't have Cloud API metadata. |
2304 dst_obj_metadata = None | 2802 dst_obj_metadata = None |
2305 | 2803 |
2306 _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata) | 2804 _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata) |
2307 | 2805 |
2308 if src_url.IsCloudUrl(): | 2806 if src_url.IsCloudUrl(): |
2309 if dst_url.IsFileUrl(): | 2807 if dst_url.IsFileUrl(): |
2310 return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, | 2808 return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
2311 gsutil_api, logger, test_method=test_method) | 2809 gsutil_api, logger, command_obj, |
| 2810 copy_exception_handler, |
| 2811 allow_splitting=allow_splitting) |
2312 elif copy_in_the_cloud: | 2812 elif copy_in_the_cloud: |
2313 return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, | 2813 return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, |
2314 dst_obj_metadata, preconditions, | 2814 dst_obj_metadata, preconditions, |
2315 gsutil_api, logger) | 2815 gsutil_api, logger) |
2316 else: | 2816 else: |
2317 return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, | 2817 return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, |
2318 dst_url, dst_obj_metadata, | 2818 dst_url, dst_obj_metadata, |
2319 preconditions, gsutil_api, logger) | 2819 preconditions, gsutil_api, logger) |
2320 else: # src_url.IsFileUrl() | 2820 else: # src_url.IsFileUrl() |
2321 if dst_url.IsCloudUrl(): | 2821 if dst_url.IsCloudUrl(): |
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2492 if past_scheme.find(sep) == -1: | 2992 if past_scheme.find(sep) == -1: |
2493 return 'file://' | 2993 return 'file://' |
2494 else: | 2994 else: |
2495 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] | 2995 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] |
2496 if url.IsBucket(): | 2996 if url.IsBucket(): |
2497 return '%s://' % url.scheme | 2997 return '%s://' % url.scheme |
2498 # Else it names a bucket subdir. | 2998 # Else it names a bucket subdir. |
2499 return url.url_string.rstrip(sep).rpartition(sep)[0] | 2999 return url.url_string.rstrip(sep).rpartition(sep)[0] |
2500 | 3000 |
2501 | 3001 |
2502 def _DivideAndCeil(dividend, divisor): | |
2503 """Returns ceil(dividend / divisor). | |
2504 | |
2505 Takes care to avoid the pitfalls of floating point arithmetic that could | |
2506 otherwise yield the wrong result for large numbers. | |
2507 | |
2508 Args: | |
2509 dividend: Dividend for the operation. | |
2510 divisor: Divisor for the operation. | |
2511 | |
2512 Returns: | |
2513 Quotient. | |
2514 """ | |
2515 quotient = dividend // divisor | |
2516 if (dividend % divisor) != 0: | |
2517 quotient += 1 | |
2518 return quotient | |
2519 | |
2520 | |
2521 def _GetPartitionInfo(file_size, max_components, default_component_size): | 3002 def _GetPartitionInfo(file_size, max_components, default_component_size): |
2522 """Gets info about a file partition for parallel composite uploads. | 3003 """Gets info about a file partition for parallel file/object transfers. |
2523 | 3004 |
2524 Args: | 3005 Args: |
2525 file_size: The number of bytes in the file to be partitioned. | 3006 file_size: The number of bytes in the file to be partitioned. |
2526 max_components: The maximum number of components that can be composed. | 3007 max_components: The maximum number of components that can be composed. |
2527 default_component_size: The size of a component, assuming that | 3008 default_component_size: The size of a component, assuming that |
2528 max_components is infinite. | 3009 max_components is infinite. |
2529 Returns: | 3010 Returns: |
2530 The number of components in the partitioned file, and the size of each | 3011 The number of components in the partitioned file, and the size of each |
2531 component (except the last, which will have a different size iff | 3012 component (except the last, which will have a different size iff |
2532 file_size != 0 (mod num_components)). | 3013 file_size != 0 (mod num_components)). |
2533 """ | 3014 """ |
2534 # num_components = ceil(file_size / default_component_size) | 3015 # num_components = ceil(file_size / default_component_size) |
2535 num_components = _DivideAndCeil(file_size, default_component_size) | 3016 num_components = DivideAndCeil(file_size, default_component_size) |
2536 | 3017 |
2537 # num_components must be in the range [2, max_components] | 3018 # num_components must be in the range [2, max_components] |
2538 num_components = max(min(num_components, max_components), 2) | 3019 num_components = max(min(num_components, max_components), 2) |
2539 | 3020 |
2540 # component_size = ceil(file_size / num_components) | 3021 # component_size = ceil(file_size / num_components) |
2541 component_size = _DivideAndCeil(file_size, num_components) | 3022 component_size = DivideAndCeil(file_size, num_components) |
2542 return (num_components, component_size) | 3023 return (num_components, component_size) |
2543 | 3024 |
2544 | 3025 |
2545 def _DeleteObjectFn(cls, url_to_delete, thread_state=None): | 3026 def _DeleteTempComponentObjectFn(cls, url_to_delete, thread_state=None): |
2546 """Wrapper function to be used with command.Apply().""" | 3027 """Wrapper func to be used with command.Apply to delete temporary objects.""" |
2547 gsutil_api = GetCloudApiInstance(cls, thread_state) | 3028 gsutil_api = GetCloudApiInstance(cls, thread_state) |
2548 gsutil_api.DeleteObject( | 3029 try: |
2549 url_to_delete.bucket_name, url_to_delete.object_name, | 3030 gsutil_api.DeleteObject( |
2550 generation=url_to_delete.generation, provider=url_to_delete.scheme) | 3031 url_to_delete.bucket_name, url_to_delete.object_name, |
| 3032 generation=url_to_delete.generation, provider=url_to_delete.scheme) |
| 3033 except NotFoundException: |
| 3034 # The temporary object could already be gone if a retry was |
| 3035 # issued at a lower layer but the original request succeeded. |
| 3036 # Barring other errors, the top-level command should still report success, |
| 3037 # so don't raise here. |
| 3038 pass |
2551 | 3039 |
2552 | 3040 |
2553 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock): | 3041 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock): |
2554 """Parse the tracker file from the last parallel composite upload attempt. | 3042 """Parse the tracker file from the last parallel composite upload attempt. |
2555 | 3043 |
2556 If it exists, the tracker file is of the format described in | 3044 If it exists, the tracker file is of the format described in |
2557 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be | 3045 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be |
2558 read, then the upload will start from the beginning. | 3046 read, then the upload will start from the beginning. |
2559 | 3047 |
2560 Args: | 3048 Args: |
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
2778 url.generation = tracker_object.generation | 3266 url.generation = tracker_object.generation |
2779 uploaded_components.append(url) | 3267 uploaded_components.append(url) |
2780 objects_already_chosen.append(tracker_object.object_name) | 3268 objects_already_chosen.append(tracker_object.object_name) |
2781 | 3269 |
2782 if uploaded_components: | 3270 if uploaded_components: |
2783 logging.info('Found %d existing temporary components to reuse.', | 3271 logging.info('Found %d existing temporary components to reuse.', |
2784 len(uploaded_components)) | 3272 len(uploaded_components)) |
2785 | 3273 |
2786 return (components_to_upload, uploaded_components, | 3274 return (components_to_upload, uploaded_components, |
2787 existing_objects_to_delete) | 3275 existing_objects_to_delete) |
OLD | NEW |