Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(165)

Side by Side Diff: gslib/copy_helper.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « gslib/commands/web.py ('k') | gslib/cred_types.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Property Changes:
Added: svn:eol-style
+ LF
OLDNEW
(Empty)
1 # -*- coding: utf-8 -*-
2 # Copyright 2011 Google Inc. All Rights Reserved.
3 # Copyright 2011, Nexenta Systems Inc.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 """Helper functions for copy functionality."""
17
18 from __future__ import absolute_import
19
20 import base64
21 from collections import namedtuple
22 import csv
23 import datetime
24 import errno
25 import gzip
26 import hashlib
27 from hashlib import md5
28 import json
29 import logging
30 import mimetypes
31 import os
32 import random
33 import re
34 import shutil
35 import stat
36 import subprocess
37 import sys
38 import tempfile
39 import textwrap
40 import time
41 import traceback
42
43 from boto import config
44 import crcmod
45
46 import gslib
47 from gslib.cloud_api import ArgumentException
48 from gslib.cloud_api import CloudApi
49 from gslib.cloud_api import NotFoundException
50 from gslib.cloud_api import PreconditionException
51 from gslib.cloud_api import Preconditions
52 from gslib.cloud_api import ResumableDownloadException
53 from gslib.cloud_api import ResumableUploadAbortException
54 from gslib.cloud_api import ResumableUploadException
55 from gslib.cloud_api_helper import GetDownloadSerializationDict
56 from gslib.commands.compose import MAX_COMPOSE_ARITY
57 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SI ZE
58 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD
59 from gslib.cs_api_map import ApiSelector
60 from gslib.daisy_chain_wrapper import DaisyChainWrapper
61 from gslib.exception import CommandException
62 from gslib.file_part import FilePart
63 from gslib.hashing_helper import Base64EncodeHash
64 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
65 from gslib.hashing_helper import CalculateHashesFromContents
66 from gslib.hashing_helper import GetDownloadHashAlgs
67 from gslib.hashing_helper import GetUploadHashAlgs
68 from gslib.hashing_helper import HashingFileUploadWrapper
69 from gslib.progress_callback import ConstructAnnounceText
70 from gslib.progress_callback import FileProgressCallbackHandler
71 from gslib.progress_callback import ProgressCallbackWithBackoff
72 from gslib.storage_url import ContainsWildcard
73 from gslib.storage_url import StorageUrlFromString
74 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m essages
75 from gslib.translation_helper import AddS3MarkerAclToObjectMetadata
76 from gslib.translation_helper import CopyObjectMetadata
77 from gslib.translation_helper import DEFAULT_CONTENT_TYPE
78 from gslib.translation_helper import GenerationFromUrlAndString
79 from gslib.translation_helper import ObjectMetadataFromHeaders
80 from gslib.translation_helper import PreconditionsFromHeaders
81 from gslib.translation_helper import S3MarkerAclFromObjectMetadata
82 from gslib.util import CreateLock
83 from gslib.util import CreateTrackerDirIfNeeded
84 from gslib.util import DEFAULT_FILE_BUFFER_SIZE
85 from gslib.util import GetCloudApiInstance
86 from gslib.util import GetFileSize
87 from gslib.util import GetStreamFromFileUrl
88 from gslib.util import HumanReadableToBytes
89 from gslib.util import IS_WINDOWS
90 from gslib.util import IsCloudSubdirPlaceholder
91 from gslib.util import MakeHumanReadable
92 from gslib.util import MIN_SIZE_COMPUTE_LOGGING
93 from gslib.util import ResumableThreshold
94 from gslib.util import TEN_MB
95 from gslib.util import UTF8
96 from gslib.wildcard_iterator import CreateWildcardIterator
97
98 # pylint: disable=g-import-not-at-top
99 if IS_WINDOWS:
100 import msvcrt
101 from ctypes import c_int
102 from ctypes import c_uint64
103 from ctypes import c_char_p
104 from ctypes import c_wchar_p
105 from ctypes import windll
106 from ctypes import POINTER
107 from ctypes import WINFUNCTYPE
108 from ctypes import WinError
109
110 # Declare copy_helper_opts as a global because namedtuple isn't aware of
111 # assigning to a class member (which breaks pickling done by multiprocessing).
112 # For details see
113 # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instanc e-correctly
114 # Similarly can't pickle logger.
115 # pylint: disable=global-at-module-level
116 global global_copy_helper_opts, global_logger
117
118 PARALLEL_UPLOAD_TEMP_NAMESPACE = (
119 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/')
120
121 PARALLEL_UPLOAD_STATIC_SALT = u"""
122 PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS.
123 The theory is that no user will have prepended this to the front of
124 one of their object names and then done an MD5 hash of the name, and
125 then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object
126 name. Note that there will be no problems with object name length since we
127 hash the original name.
128 """
129
130 TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT = (
131 'Couldn\'t write tracker file (%s): %s. This can happen if gsutil is '
132 'configured to save tracker files to an unwritable directory)')
133
134 # When uploading a file, get the following fields in the response for
135 # filling in command output and manifests.
136 UPLOAD_RETURN_FIELDS = ['crc32c', 'generation', 'md5Hash', 'size']
137
138 # This tuple is used only to encapsulate the arguments needed for
139 # command.Apply() in the parallel composite upload case.
140 # Note that content_type is used instead of a full apitools Object() because
141 # apitools objects are not picklable.
142 # filename: String name of file.
143 # file_start: start byte of file (may be in the middle of a file for partitioned
144 # files).
145 # file_length: length of upload (may not be the entire length of a file for
146 # partitioned files).
147 # src_url: FileUrl describing the source file.
148 # dst_url: CloudUrl describing the destination component file.
149 # canned_acl: canned_acl to apply to the uploaded file/component.
150 # content_type: content-type for final object, used for setting content-type
151 # of components and final object.
152 # tracker_file: tracker file for this component.
153 # tracker_file_lock: tracker file lock for tracker file(s).
154 PerformParallelUploadFileToObjectArgs = namedtuple(
155 'PerformParallelUploadFileToObjectArgs',
156 'filename file_start file_length src_url dst_url canned_acl '
157 'content_type tracker_file tracker_file_lock')
158
159 ObjectFromTracker = namedtuple('ObjectFromTracker',
160 'object_name generation')
161
162 # The maximum length of a file name can vary wildly between different
163 # operating systems, so we always ensure that tracker files are less
164 # than 100 characters in order to avoid any such issues.
165 MAX_TRACKER_FILE_NAME_LENGTH = 100
166
167 # TODO: Refactor this file to be less cumbersome. In particular, some of the
168 # different paths (e.g., uploading a file to an object vs. downloading an
169 # object to a file) could be split into separate files.
170
171 # Chunk size to use while zipping/unzipping gzip files.
172 GZIP_CHUNK_SIZE = 8192
173
174 PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024
175
176 suggested_parallel_composites = False
177
178
179 class TrackerFileType(object):
180 UPLOAD = 'upload'
181 DOWNLOAD = 'download'
182 PARALLEL_UPLOAD = 'parallel_upload'
183
184
185 def _RmExceptionHandler(cls, e):
186 """Simple exception handler to allow post-completion status."""
187 cls.logger.error(str(e))
188
189
190 def _ParallelUploadCopyExceptionHandler(cls, e):
191 """Simple exception handler to allow post-completion status."""
192 cls.logger.error(str(e))
193 cls.op_failure_count += 1
194 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n',
195 traceback.format_exc())
196
197
198 def _PerformParallelUploadFileToObject(cls, args, thread_state=None):
199 """Function argument to Apply for performing parallel composite uploads.
200
201 Args:
202 cls: Calling Command class.
203 args: PerformParallelUploadFileToObjectArgs tuple describing the target.
204 thread_state: gsutil Cloud API instance to use for the operation.
205
206 Returns:
207 StorageUrl representing a successfully uploaded component.
208 """
209 fp = FilePart(args.filename, args.file_start, args.file_length)
210 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
211 with fp:
212 # We take many precautions with the component names that make collisions
213 # effectively impossible. Specifying preconditions will just allow us to
214 # reach a state in which uploads will always fail on retries.
215 preconditions = None
216
217 # Fill in content type if one was provided.
218 dst_object_metadata = apitools_messages.Object(
219 name=args.dst_url.object_name,
220 bucket=args.dst_url.bucket_name,
221 contentType=args.content_type)
222
223 try:
224 if global_copy_helper_opts.canned_acl:
225 # No canned ACL support in JSON, force XML API to be used for
226 # upload/copy operations.
227 orig_prefer_api = gsutil_api.prefer_api
228 gsutil_api.prefer_api = ApiSelector.XML
229 ret = _UploadFileToObject(args.src_url, fp, args.file_length,
230 args.dst_url, dst_object_metadata,
231 preconditions, gsutil_api, cls.logger, cls,
232 _ParallelUploadCopyExceptionHandler,
233 gzip_exts=None, allow_splitting=False)
234 finally:
235 if global_copy_helper_opts.canned_acl:
236 gsutil_api.prefer_api = orig_prefer_api
237
238 component = ret[2]
239 _AppendComponentTrackerToParallelUploadTrackerFile(
240 args.tracker_file, component, args.tracker_file_lock)
241 return ret
242
243
244 CopyHelperOpts = namedtuple('CopyHelperOpts', [
245 'perform_mv',
246 'no_clobber',
247 'daisy_chain',
248 'read_args_from_stdin',
249 'print_ver',
250 'use_manifest',
251 'preserve_acl',
252 'canned_acl',
253 'halt_at_byte'])
254
255
256 # pylint: disable=global-variable-undefined
257 def CreateCopyHelperOpts(perform_mv=False, no_clobber=False, daisy_chain=False,
258 read_args_from_stdin=False, print_ver=False,
259 use_manifest=False, preserve_acl=False,
260 canned_acl=None, halt_at_byte=None):
261 """Creates CopyHelperOpts for passing options to CopyHelper."""
262 # We create a tuple with union of options needed by CopyHelper and any
263 # copy-related functionality in CpCommand, RsyncCommand, or Command class.
264 global global_copy_helper_opts
265 global_copy_helper_opts = CopyHelperOpts(
266 perform_mv=perform_mv,
267 no_clobber=no_clobber,
268 daisy_chain=daisy_chain,
269 read_args_from_stdin=read_args_from_stdin,
270 print_ver=print_ver,
271 use_manifest=use_manifest,
272 preserve_acl=preserve_acl,
273 canned_acl=canned_acl,
274 halt_at_byte=halt_at_byte)
275 return global_copy_helper_opts
276
277
278 # pylint: disable=global-variable-undefined
279 # pylint: disable=global-variable-not-assigned
280 def GetCopyHelperOpts():
281 """Returns namedtuple holding CopyHelper options."""
282 global global_copy_helper_opts
283 return global_copy_helper_opts
284
285
286 def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None):
287 """Gets the tracker file name described by the arguments.
288
289 Public for testing purposes.
290
291 Args:
292 dst_url: Destination URL for tracker file.
293 tracker_file_type: TrackerFileType for this operation.
294 api_selector: API to use for this operation.
295 src_url: Source URL for the source file name for parallel uploads.
296
297 Returns:
298 File path to tracker file.
299 """
300 resumable_tracker_dir = CreateTrackerDirIfNeeded()
301 if tracker_file_type == TrackerFileType.UPLOAD:
302 # Encode the dest bucket and object name into the tracker file name.
303 res_tracker_file_name = (
304 re.sub('[/\\\\]', '_', 'resumable_upload__%s__%s__%s.url' %
305 (dst_url.bucket_name, dst_url.object_name, api_selector)))
306 elif tracker_file_type == TrackerFileType.DOWNLOAD:
307 # Encode the fully-qualified dest file name into the tracker file name.
308 res_tracker_file_name = (
309 re.sub('[/\\\\]', '_', 'resumable_download__%s__%s.etag' %
310 (os.path.realpath(dst_url.object_name), api_selector)))
311 elif tracker_file_type == TrackerFileType.PARALLEL_UPLOAD:
312 # Encode the dest bucket and object names as well as the source file name
313 # into the tracker file name.
314 res_tracker_file_name = (
315 re.sub('[/\\\\]', '_', 'parallel_upload__%s__%s__%s__%s.url' %
316 (dst_url.bucket_name, dst_url.object_name,
317 src_url, api_selector)))
318
319 res_tracker_file_name = _HashFilename(res_tracker_file_name)
320 tracker_file_name = '%s_%s' % (str(tracker_file_type).lower(),
321 res_tracker_file_name)
322 tracker_file_path = '%s%s%s' % (resumable_tracker_dir, os.sep,
323 tracker_file_name)
324 assert len(tracker_file_name) < MAX_TRACKER_FILE_NAME_LENGTH
325 return tracker_file_path
326
327
328 def _SelectDownloadStrategy(src_obj_metadata, dst_url):
329 """Get download strategy based on the source and dest objects.
330
331 Args:
332 src_obj_metadata: Object describing the source object.
333 dst_url: Destination StorageUrl.
334
335 Returns:
336 gsutil Cloud API DownloadStrategy.
337 """
338 dst_is_special = False
339 if dst_url.IsFileUrl():
340 # Check explicitly first because os.stat doesn't work on 'nul' in Windows.
341 if dst_url.object_name == os.devnull:
342 dst_is_special = True
343 try:
344 mode = os.stat(dst_url.object_name).st_mode
345 if stat.S_ISCHR(mode):
346 dst_is_special = True
347 except OSError:
348 pass
349
350 if src_obj_metadata.size >= ResumableThreshold() and not dst_is_special:
351 return CloudApi.DownloadStrategy.RESUMABLE
352 else:
353 return CloudApi.DownloadStrategy.ONE_SHOT
354
355
356 def _GetUploadTrackerData(tracker_file_name, logger):
357 """Checks for an upload tracker file and creates one if it does not exist.
358
359 Args:
360 tracker_file_name: Tracker file name for this upload.
361 logger: for outputting log messages.
362
363 Returns:
364 Serialization data if the tracker file already exists (resume existing
365 upload), None otherwise.
366 """
367 tracker_file = None
368
369 # If we already have a matching tracker file, get the serialization data
370 # so that we can resume the upload.
371 try:
372 tracker_file = open(tracker_file_name, 'r')
373 tracker_data = tracker_file.read()
374 return tracker_data
375 except IOError as e:
376 # Ignore non-existent file (happens first time a upload
377 # is attempted on an object), but warn user for other errors.
378 if e.errno != errno.ENOENT:
379 logger.warn('Couldn\'t read upload tracker file (%s): %s. Restarting '
380 'upload from scratch.', tracker_file_name, e.strerror)
381 finally:
382 if tracker_file:
383 tracker_file.close()
384
385
386 def _ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url,
387 api_selector):
388 """Checks for a download tracker file and creates one if it does not exist.
389
390 Args:
391 src_obj_metadata: Metadata for the source object. Must include
392 etag.
393 dst_url: Destination file StorageUrl.
394 api_selector: API mode to use (for tracker file naming).
395
396 Returns:
397 True if the tracker file already exists (resume existing download),
398 False if we created a new tracker file (new download).
399 """
400 assert src_obj_metadata.etag
401 tracker_file_name = GetTrackerFilePath(
402 dst_url, TrackerFileType.DOWNLOAD, api_selector)
403 tracker_file = None
404
405 # Check to see if we already have a matching tracker file.
406 try:
407 tracker_file = open(tracker_file_name, 'r')
408 etag_value = tracker_file.readline().rstrip('\n')
409 if etag_value == src_obj_metadata.etag:
410 return True
411 except IOError as e:
412 # Ignore non-existent file (happens first time a download
413 # is attempted on an object), but warn user for other errors.
414 if e.errno != errno.ENOENT:
415 print('Couldn\'t read URL tracker file (%s): %s. Restarting '
416 'download from scratch.' %
417 (tracker_file_name, e.strerror))
418 finally:
419 if tracker_file:
420 tracker_file.close()
421
422 # Otherwise, create a new tracker file and start from scratch.
423 try:
424 with os.fdopen(os.open(tracker_file_name,
425 os.O_WRONLY | os.O_CREAT, 0600), 'w') as tf:
426 tf.write('%s\n' % src_obj_metadata.etag)
427 return False
428 except IOError as e:
429 raise CommandException(TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT %
430 (tracker_file_name, e.strerror))
431 finally:
432 if tracker_file:
433 tracker_file.close()
434
435
436 def _DeleteTrackerFile(tracker_file_name):
437 if tracker_file_name and os.path.exists(tracker_file_name):
438 os.unlink(tracker_file_name)
439
440
441 def InsistDstUrlNamesContainer(exp_dst_url, have_existing_dst_container,
442 command_name):
443 """Ensures the destination URL names a container.
444
445 Acceptable containers include directory, bucket, bucket
446 subdir, and non-existent bucket subdir.
447
448 Args:
449 exp_dst_url: Wildcard-expanded destination StorageUrl.
450 have_existing_dst_container: bool indicator of whether exp_dst_url
451 names a container (directory, bucket, or existing bucket subdir).
452 command_name: Name of command making call. May not be the same as the
453 calling class's self.command_name in the case of commands implemented
454 atop other commands (like mv command).
455
456 Raises:
457 CommandException: if the URL being checked does not name a container.
458 """
459 if ((exp_dst_url.IsFileUrl() and not exp_dst_url.IsDirectory()) or
460 (exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket()
461 and not have_existing_dst_container)):
462 raise CommandException('Destination URL must name a directory, bucket, '
463 'or bucket\nsubdirectory for the multiple '
464 'source form of the %s command.' % command_name)
465
466
467 def _ShouldTreatDstUrlAsBucketSubDir(have_multiple_srcs, dst_url,
468 have_existing_dest_subdir,
469 src_url_names_container,
470 recursion_requested):
471 """Checks whether dst_url should be treated as a bucket "sub-directory".
472
473 The decision about whether something constitutes a bucket "sub-directory"
474 depends on whether there are multiple sources in this request and whether
475 there is an existing bucket subdirectory. For example, when running the
476 command:
477 gsutil cp file gs://bucket/abc
478 if there's no existing gs://bucket/abc bucket subdirectory we should copy
479 file to the object gs://bucket/abc. In contrast, if
480 there's an existing gs://bucket/abc bucket subdirectory we should copy
481 file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc
482 exists, when running the command:
483 gsutil cp file1 file2 gs://bucket/abc
484 we should copy file1 to gs://bucket/abc/file1 (and similarly for file2).
485 Finally, for recursive copies, if the source is a container then we should
486 copy to a container as the target. For example, when running the command:
487 gsutil cp -r dir1 gs://bucket/dir2
488 we should copy the subtree of dir1 to gs://bucket/dir2.
489
490 Note that we don't disallow naming a bucket "sub-directory" where there's
491 already an object at that URL. For example it's legitimate (albeit
492 confusing) to have an object called gs://bucket/dir and
493 then run the command
494 gsutil cp file1 file2 gs://bucket/dir
495 Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1,
496 and gs://bucket/dir/file2.
497
498 Args:
499 have_multiple_srcs: Bool indicator of whether this is a multi-source
500 operation.
501 dst_url: StorageUrl to check.
502 have_existing_dest_subdir: bool indicator whether dest is an existing
503 subdirectory.
504 src_url_names_container: bool indicator of whether the source URL
505 is a container.
506 recursion_requested: True if a recursive operation has been requested.
507
508 Returns:
509 bool indicator.
510 """
511 if have_existing_dest_subdir:
512 return True
513 if dst_url.IsCloudUrl():
514 return (have_multiple_srcs or
515 (src_url_names_container and recursion_requested))
516
517
518 def _ShouldTreatDstUrlAsSingleton(have_multiple_srcs,
519 have_existing_dest_subdir, dst_url,
520 recursion_requested):
521 """Checks that dst_url names a single file/object after wildcard expansion.
522
523 It is possible that an object path might name a bucket sub-directory.
524
525 Args:
526 have_multiple_srcs: Bool indicator of whether this is a multi-source
527 operation.
528 have_existing_dest_subdir: bool indicator whether dest is an existing
529 subdirectory.
530 dst_url: StorageUrl to check.
531 recursion_requested: True if a recursive operation has been requested.
532
533 Returns:
534 bool indicator.
535 """
536 if recursion_requested:
537 return False
538 if dst_url.IsFileUrl():
539 return not dst_url.IsDirectory()
540 else: # dst_url.IsCloudUrl()
541 return (not have_multiple_srcs and
542 not have_existing_dest_subdir and
543 dst_url.IsObject())
544
545
546 def ConstructDstUrl(src_url, exp_src_url, src_url_names_container,
547 have_multiple_srcs, exp_dst_url, have_existing_dest_subdir,
548 recursion_requested):
549 """Constructs the destination URL for a given exp_src_url/exp_dst_url pair.
550
551 Uses context-dependent naming rules that mimic Linux cp and mv behavior.
552
553 Args:
554 src_url: Source StorageUrl to be copied.
555 exp_src_url: Single StorageUrl from wildcard expansion of src_url.
556 src_url_names_container: True if src_url names a container (including the
557 case of a wildcard-named bucket subdir (like gs://bucket/abc,
558 where gs://bucket/abc/* matched some objects).
559 have_multiple_srcs: True if this is a multi-source request. This can be
560 true if src_url wildcard-expanded to multiple URLs or if there were
561 multiple source URLs in the request.
562 exp_dst_url: the expanded StorageUrl requested for the cp destination.
563 Final written path is constructed from this plus a context-dependent
564 variant of src_url.
565 have_existing_dest_subdir: bool indicator whether dest is an existing
566 subdirectory.
567 recursion_requested: True if a recursive operation has been requested.
568
569 Returns:
570 StorageUrl to use for copy.
571
572 Raises:
573 CommandException if destination object name not specified for
574 source and source is a stream.
575 """
576 if _ShouldTreatDstUrlAsSingleton(
577 have_multiple_srcs, have_existing_dest_subdir, exp_dst_url,
578 recursion_requested):
579 # We're copying one file or object to one file or object.
580 return exp_dst_url
581
582 if exp_src_url.IsFileUrl() and exp_src_url.IsStream():
583 if have_existing_dest_subdir:
584 raise CommandException('Destination object name needed when '
585 'source is a stream')
586 return exp_dst_url
587
588 if not recursion_requested and not have_multiple_srcs:
589 # We're copying one file or object to a subdirectory. Append final comp
590 # of exp_src_url to exp_dst_url.
591 src_final_comp = exp_src_url.object_name.rpartition(src_url.delim)[-1]
592 return StorageUrlFromString('%s%s%s' % (
593 exp_dst_url.url_string.rstrip(exp_dst_url.delim),
594 exp_dst_url.delim, src_final_comp))
595
596 # Else we're copying multiple sources to a directory, bucket, or a bucket
597 # "sub-directory".
598
599 # Ensure exp_dst_url ends in delim char if we're doing a multi-src copy or
600 # a copy to a directory. (The check for copying to a directory needs
601 # special-case handling so that the command:
602 # gsutil cp gs://bucket/obj dir
603 # will turn into file://dir/ instead of file://dir -- the latter would cause
604 # the file "dirobj" to be created.)
605 # Note: need to check have_multiple_srcs or src_url.names_container()
606 # because src_url could be a bucket containing a single object, named
607 # as gs://bucket.
608 if ((have_multiple_srcs or src_url_names_container or
609 (exp_dst_url.IsFileUrl() and exp_dst_url.IsDirectory()))
610 and not exp_dst_url.url_string.endswith(exp_dst_url.delim)):
611 exp_dst_url = StorageUrlFromString('%s%s' % (exp_dst_url.url_string,
612 exp_dst_url.delim))
613
614 # Making naming behavior match how things work with local Linux cp and mv
615 # operations depends on many factors, including whether the destination is a
616 # container, the plurality of the source(s), and whether the mv command is
617 # being used:
618 # 1. For the "mv" command that specifies a non-existent destination subdir,
619 # renaming should occur at the level of the src subdir, vs appending that
620 # subdir beneath the dst subdir like is done for copying. For example:
621 # gsutil rm -R gs://bucket
622 # gsutil cp -R dir1 gs://bucket
623 # gsutil cp -R dir2 gs://bucket/subdir1
624 # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2
625 # would (if using cp naming behavior) end up with paths like:
626 # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops
627 # whereas mv naming behavior should result in:
628 # gs://bucket/subdir2/dir2/.svn/all-wcprops
629 # 2. Copying from directories, buckets, or bucket subdirs should result in
630 # objects/files mirroring the source directory hierarchy. For example:
631 # gsutil cp dir1/dir2 gs://bucket
632 # should create the object gs://bucket/dir2/file2, assuming dir1/dir2
633 # contains file2).
634 # To be consistent with Linux cp behavior, there's one more wrinkle when
635 # working with subdirs: The resulting object names depend on whether the
636 # destination subdirectory exists. For example, if gs://bucket/subdir
637 # exists, the command:
638 # gsutil cp -R dir1/dir2 gs://bucket/subdir
639 # should create objects named like gs://bucket/subdir/dir2/a/b/c. In
640 # contrast, if gs://bucket/subdir does not exist, this same command
641 # should create objects named like gs://bucket/subdir/a/b/c.
642 # 3. Copying individual files or objects to dirs, buckets or bucket subdirs
643 # should result in objects/files named by the final source file name
644 # component. Example:
645 # gsutil cp dir1/*.txt gs://bucket
646 # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt,
647 # assuming dir1 contains f1.txt and f2.txt.
648
649 recursive_move_to_new_subdir = False
650 if (global_copy_helper_opts.perform_mv and recursion_requested
651 and src_url_names_container and not have_existing_dest_subdir):
652 # Case 1. Handle naming rules for bucket subdir mv. Here we want to
653 # line up the src_url against its expansion, to find the base to build
654 # the new name. For example, running the command:
655 # gsutil mv gs://bucket/abcd gs://bucket/xyz
656 # when processing exp_src_url=gs://bucket/abcd/123
657 # exp_src_url_tail should become /123
658 # Note: mv.py code disallows wildcard specification of source URL.
659 recursive_move_to_new_subdir = True
660 exp_src_url_tail = (
661 exp_src_url.url_string[len(src_url.url_string):])
662 dst_key_name = '%s/%s' % (exp_dst_url.object_name.rstrip('/'),
663 exp_src_url_tail.strip('/'))
664
665 elif src_url_names_container and (exp_dst_url.IsCloudUrl() or
666 exp_dst_url.IsDirectory()):
667 # Case 2. Container copy to a destination other than a file.
668 # Build dst_key_name from subpath of exp_src_url past
669 # where src_url ends. For example, for src_url=gs://bucket/ and
670 # exp_src_url=gs://bucket/src_subdir/obj, dst_key_name should be
671 # src_subdir/obj.
672 src_url_path_sans_final_dir = GetPathBeforeFinalDir(src_url)
673 dst_key_name = exp_src_url.versionless_url_string[
674 len(src_url_path_sans_final_dir):].lstrip(src_url.delim)
675 # Handle case where dst_url is a non-existent subdir.
676 if not have_existing_dest_subdir:
677 dst_key_name = dst_key_name.partition(src_url.delim)[-1]
678 # Handle special case where src_url was a directory named with '.' or
679 # './', so that running a command like:
680 # gsutil cp -r . gs://dest
681 # will produce obj names of the form gs://dest/abc instead of
682 # gs://dest/./abc.
683 if dst_key_name.startswith('.%s' % os.sep):
684 dst_key_name = dst_key_name[2:]
685
686 else:
687 # Case 3.
688 dst_key_name = exp_src_url.object_name.rpartition(src_url.delim)[-1]
689
690 if (not recursive_move_to_new_subdir and (
691 exp_dst_url.IsFileUrl() or _ShouldTreatDstUrlAsBucketSubDir(
692 have_multiple_srcs, exp_dst_url, have_existing_dest_subdir,
693 src_url_names_container, recursion_requested))):
694 if exp_dst_url.object_name and exp_dst_url.object_name.endswith(
695 exp_dst_url.delim):
696 dst_key_name = '%s%s%s' % (
697 exp_dst_url.object_name.rstrip(exp_dst_url.delim),
698 exp_dst_url.delim, dst_key_name)
699 else:
700 delim = exp_dst_url.delim if exp_dst_url.object_name else ''
701 dst_key_name = '%s%s%s' % (exp_dst_url.object_name or '',
702 delim, dst_key_name)
703
704 new_exp_dst_url = exp_dst_url.Clone()
705 new_exp_dst_url.object_name = dst_key_name.replace(src_url.delim,
706 exp_dst_url.delim)
707 return new_exp_dst_url
708
709
710 def _CreateDigestsFromDigesters(digesters):
711 digests = {}
712 if digesters:
713 for alg in digesters:
714 digests[alg] = base64.encodestring(
715 digesters[alg].digest()).rstrip('\n')
716 return digests
717
718
719 def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata):
720 """Creates a base64 CRC32C and/or MD5 digest from file_name.
721
722 Args:
723 logger: for outputting log messages.
724 algs: list of algorithms to compute.
725 file_name: file to digest.
726 src_obj_metadata: metadta of source object.
727
728 Returns:
729 Dict of algorithm name : base 64 encoded digest
730 """
731 hash_dict = {}
732 if 'md5' in algs:
733 if src_obj_metadata.size and src_obj_metadata.size > TEN_MB:
734 logger.info(
735 'Computing MD5 for %s...', file_name)
736 hash_dict['md5'] = md5()
737 if 'crc32c' in algs:
738 hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c')
739 with open(file_name, 'rb') as fp:
740 CalculateHashesFromContents(
741 fp, hash_dict, ProgressCallbackWithBackoff(
742 src_obj_metadata.size,
743 FileProgressCallbackHandler(
744 ConstructAnnounceText('Hashing', file_name), logger).call))
745 digests = {}
746 for alg_name, digest in hash_dict.iteritems():
747 digests[alg_name] = Base64EncodeHash(digest.hexdigest())
748 return digests
749
750
751 def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
752 dst_obj_metadata):
753 """Validates integrity of two cloud objects copied via daisy-chain.
754
755 Args:
756 logger: for outputting log messages.
757 src_url: CloudUrl for source cloud object.
758 dst_url: CloudUrl for destination cloud object.
759 src_obj_metadata: Cloud Object metadata for object being downloaded from.
760 dst_obj_metadata: Cloud Object metadata for object being uploaded to.
761
762 Raises:
763 CommandException: if cloud digests don't match local digests.
764 """
765 checked_one = False
766 download_hashes = {}
767 upload_hashes = {}
768 if src_obj_metadata.md5Hash:
769 download_hashes['md5'] = src_obj_metadata.md5Hash
770 if src_obj_metadata.crc32c:
771 download_hashes['crc32c'] = src_obj_metadata.crc32c
772 if dst_obj_metadata.md5Hash:
773 upload_hashes['md5'] = dst_obj_metadata.md5Hash
774 if dst_obj_metadata.crc32c:
775 upload_hashes['crc32c'] = dst_obj_metadata.crc32c
776
777 for alg, upload_b64_digest in upload_hashes.iteritems():
778 if alg not in download_hashes:
779 continue
780
781 download_b64_digest = download_hashes[alg]
782 logger.debug(
783 'Comparing source vs destination %s-checksum for %s. (%s/%s)', alg,
784 dst_url, download_b64_digest, upload_b64_digest)
785 if download_b64_digest != upload_b64_digest:
786 raise CommandException(
787 '%s signature for source object (%s) doesn\'t match '
788 'destination object digest (%s). Object (%s) will be deleted.' % (
789 alg, download_b64_digest, upload_b64_digest, dst_url))
790 checked_one = True
791 if not checked_one:
792 # One known way this can currently happen is when downloading objects larger
793 # than 5GB from S3 (for which the etag is not an MD5).
794 logger.warn(
795 'WARNING: Found no hashes to validate object downloaded from %s and '
796 'uploaded to %s. Integrity cannot be assured without hashes.',
797 src_url, dst_url)
798
799
800 def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests,
801 is_upload=False):
802 """Validates integrity by comparing cloud digest to local digest.
803
804 Args:
805 logger: for outputting log messages.
806 obj_url: CloudUrl for cloud object.
807 obj_metadata: Cloud Object being downloaded from or uploaded to.
808 file_name: Local file name on disk being downloaded to or uploaded from.
809 digests: Computed Digests for the object.
810 is_upload: If true, comparing for an uploaded object (controls logging).
811
812 Raises:
813 CommandException: if cloud digests don't match local digests.
814 """
815 local_hashes = digests
816 cloud_hashes = {}
817 if obj_metadata.md5Hash:
818 cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n')
819 if obj_metadata.crc32c:
820 cloud_hashes['crc32c'] = obj_metadata.crc32c.rstrip('\n')
821
822 checked_one = False
823 for alg in local_hashes:
824 if alg not in cloud_hashes:
825 continue
826
827 local_b64_digest = local_hashes[alg]
828 cloud_b64_digest = cloud_hashes[alg]
829 logger.debug(
830 'Comparing local vs cloud %s-checksum for %s. (%s/%s)', alg, file_name,
831 local_b64_digest, cloud_b64_digest)
832 if local_b64_digest != cloud_b64_digest:
833
834 raise CommandException(
835 '%s signature computed for local file (%s) doesn\'t match '
836 'cloud-supplied digest (%s). %s (%s) will be deleted.' % (
837 alg, local_b64_digest, cloud_b64_digest,
838 'Cloud object' if is_upload else 'Local file',
839 obj_url if is_upload else file_name))
840 checked_one = True
841 if not checked_one:
842 if is_upload:
843 logger.warn(
844 'WARNING: Found no hashes to validate object uploaded to %s. '
845 'Integrity cannot be assured without hashes.', obj_url)
846 else:
847 # One known way this can currently happen is when downloading objects larger
848 # than 5GB from S3 (for which the etag is not an MD5).
849 logger.warn(
850 'WARNING: Found no hashes to validate object downloaded to %s. '
851 'Integrity cannot be assured without hashes.', file_name)
852
853
854 def IsNoClobberServerException(e):
855 """Checks to see if the server attempted to clobber a file.
856
857 In this case we specified via a precondition that we didn't want the file
858 clobbered.
859
860 Args:
861 e: The Exception that was generated by a failed copy operation
862
863 Returns:
864 bool indicator - True indicates that the server did attempt to clobber
865 an existing file.
866 """
867 return ((isinstance(e, PreconditionException)) or
868 (isinstance(e, ResumableUploadException) and '412' in e.message))
869
870
871 def CheckForDirFileConflict(exp_src_url, dst_url):
872 """Checks whether copying exp_src_url into dst_url is not possible.
873
874 This happens if a directory exists in local file system where a file
875 needs to go or vice versa. In that case we print an error message and
876 exits. Example: if the file "./x" exists and you try to do:
877 gsutil cp gs://mybucket/x/y .
878 the request can't succeed because it requires a directory where
879 the file x exists.
880
881 Note that we don't enforce any corresponding restrictions for buckets,
882 because the flat namespace semantics for buckets doesn't prohibit such
883 cases the way hierarchical file systems do. For example, if a bucket
884 contains an object called gs://bucket/dir and then you run the command:
885 gsutil cp file1 file2 gs://bucket/dir
886 you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and
887 gs://bucket/dir/file2.
888
889 Args:
890 exp_src_url: Expanded source StorageUrl.
891 dst_url: Destination StorageUrl.
892
893 Raises:
894 CommandException: if errors encountered.
895 """
896 if dst_url.IsCloudUrl():
897 # The problem can only happen for file destination URLs.
898 return
899 dst_path = dst_url.object_name
900 final_dir = os.path.dirname(dst_path)
901 if os.path.isfile(final_dir):
902 raise CommandException('Cannot retrieve %s because a file exists '
903 'where a directory needs to be created (%s).' %
904 (exp_src_url.url_string, final_dir))
905 if os.path.isdir(dst_path):
906 raise CommandException('Cannot retrieve %s because a directory exists '
907 '(%s) where the file needs to be created.' %
908 (exp_src_url.url_string, dst_path))
909
910
911 def _PartitionFile(fp, file_size, src_url, content_type, canned_acl,
912 dst_bucket_url, random_prefix, tracker_file,
913 tracker_file_lock):
914 """Partitions a file into FilePart objects to be uploaded and later composed.
915
916 These objects, when composed, will match the original file. This entails
917 splitting the file into parts, naming and forming a destination URL for each
918 part, and also providing the PerformParallelUploadFileToObjectArgs
919 corresponding to each part.
920
921 Args:
922 fp: The file object to be partitioned.
923 file_size: The size of fp, in bytes.
924 src_url: Source FileUrl from the original command.
925 content_type: content type for the component and final objects.
926 canned_acl: The user-provided canned_acl, if applicable.
927 dst_bucket_url: CloudUrl for the destination bucket
928 random_prefix: The randomly-generated prefix used to prevent collisions
929 among the temporary component names.
930 tracker_file: The path to the parallel composite upload tracker file.
931 tracker_file_lock: The lock protecting access to the tracker file.
932
933 Returns:
934 dst_args: The destination URIs for the temporary component objects.
935 """
936 parallel_composite_upload_component_size = HumanReadableToBytes(
937 config.get('GSUtil', 'parallel_composite_upload_component_size',
938 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE))
939 (num_components, component_size) = _GetPartitionInfo(
940 file_size, MAX_COMPOSE_ARITY, parallel_composite_upload_component_size)
941
942 dst_args = {} # Arguments to create commands and pass to subprocesses.
943 file_names = [] # Used for the 2-step process of forming dst_args.
944 for i in range(num_components):
945 # "Salt" the object name with something a user is very unlikely to have
946 # used in an object name, then hash the extended name to make sure
947 # we don't run into problems with name length. Using a deterministic
948 # naming scheme for the temporary components allows users to take
949 # advantage of resumable uploads for each component.
950 encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode(UTF8)
951 content_md5 = md5()
952 content_md5.update(encoded_name)
953 digest = content_md5.hexdigest()
954 temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE +
955 digest + '_' + str(i))
956 tmp_dst_url = dst_bucket_url.Clone()
957 tmp_dst_url.object_name = temp_file_name
958
959 if i < (num_components - 1):
960 # Every component except possibly the last is the same size.
961 file_part_length = component_size
962 else:
963 # The last component just gets all of the remaining bytes.
964 file_part_length = (file_size - ((num_components -1) * component_size))
965 offset = i * component_size
966 func_args = PerformParallelUploadFileToObjectArgs(
967 fp.name, offset, file_part_length, src_url, tmp_dst_url, canned_acl,
968 content_type, tracker_file, tracker_file_lock)
969 file_names.append(temp_file_name)
970 dst_args[temp_file_name] = func_args
971
972 return dst_args
973
974
975 def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata,
976 canned_acl, file_size, preconditions, gsutil_api,
977 command_obj, copy_exception_handler):
978 """Uploads a local file to a cloud object using parallel composite upload.
979
980 The file is partitioned into parts, and then the parts are uploaded in
981 parallel, composed to form the original destination object, and deleted.
982
983 Args:
984 fp: The file object to be uploaded.
985 src_url: FileUrl representing the local file.
986 dst_url: CloudUrl representing the destination file.
987 dst_obj_metadata: apitools Object describing the destination object.
988 canned_acl: The canned acl to apply to the object, if any.
989 file_size: The size of the source file in bytes.
990 preconditions: Cloud API Preconditions for the final object.
991 gsutil_api: gsutil Cloud API instance to use.
992 command_obj: Command object (for calling Apply).
993 copy_exception_handler: Copy exception handler (for use in Apply).
994
995 Returns:
996 Elapsed upload time, uploaded Object with generation, crc32c, and size
997 fields populated.
998 """
999 start_time = time.time()
1000 dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string)
1001 api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme)
1002 # Determine which components, if any, have already been successfully
1003 # uploaded.
1004 tracker_file = GetTrackerFilePath(dst_url, TrackerFileType.PARALLEL_UPLOAD,
1005 api_selector, src_url)
1006 tracker_file_lock = CreateLock()
1007 (random_prefix, existing_components) = (
1008 _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock))
1009
1010 # Create the initial tracker file for the upload.
1011 _CreateParallelUploadTrackerFile(tracker_file, random_prefix,
1012 existing_components, tracker_file_lock)
1013
1014 # Get the set of all components that should be uploaded.
1015 dst_args = _PartitionFile(
1016 fp, file_size, src_url, dst_obj_metadata.contentType, canned_acl,
1017 dst_bucket_url, random_prefix, tracker_file, tracker_file_lock)
1018
1019 (components_to_upload, existing_components, existing_objects_to_delete) = (
1020 FilterExistingComponents(dst_args, existing_components, dst_bucket_url,
1021 gsutil_api))
1022
1023 # In parallel, copy all of the file parts that haven't already been
1024 # uploaded to temporary objects.
1025 cp_results = command_obj.Apply(
1026 _PerformParallelUploadFileToObject, components_to_upload,
1027 copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'),
1028 arg_checker=gslib.command.DummyArgChecker,
1029 parallel_operations_override=True, should_return_results=True)
1030 uploaded_components = []
1031 for cp_result in cp_results:
1032 uploaded_components.append(cp_result[2])
1033 components = uploaded_components + existing_components
1034
1035 if len(components) == len(dst_args):
1036 # Only try to compose if all of the components were uploaded successfully.
1037
1038 def _GetComponentNumber(component):
1039 return int(component.object_name[component.object_name.rfind('_')+1:])
1040 # Sort the components so that they will be composed in the correct order.
1041 components = sorted(components, key=_GetComponentNumber)
1042
1043 request_components = []
1044 for component_url in components:
1045 src_obj_metadata = (
1046 apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
1047 name=component_url.object_name))
1048 if component_url.HasGeneration():
1049 src_obj_metadata.generation = long(component_url.generation)
1050 request_components.append(src_obj_metadata)
1051
1052 composed_object = gsutil_api.ComposeObject(
1053 request_components, dst_obj_metadata, preconditions=preconditions,
1054 provider=dst_url.scheme, fields=['generation', 'crc32c', 'size'])
1055
1056 try:
1057 # Make sure only to delete things that we know were successfully
1058 # uploaded (as opposed to all of the objects that we attempted to
1059 # create) so that we don't delete any preexisting objects, except for
1060 # those that were uploaded by a previous, failed run and have since
1061 # changed (but still have an old generation lying around).
1062 objects_to_delete = components + existing_objects_to_delete
1063 command_obj.Apply(_DeleteObjectFn, objects_to_delete, _RmExceptionHandler,
1064 arg_checker=gslib.command.DummyArgChecker,
1065 parallel_operations_override=True)
1066 except Exception: # pylint: disable=broad-except
1067 # If some of the delete calls fail, don't cause the whole command to
1068 # fail. The copy was successful iff the compose call succeeded, so
1069 # reduce this to a warning.
1070 logging.warning(
1071 'Failed to delete some of the following temporary objects:\n' +
1072 '\n'.join(dst_args.keys()))
1073 finally:
1074 with tracker_file_lock:
1075 if os.path.exists(tracker_file):
1076 os.unlink(tracker_file)
1077 else:
1078 # Some of the components failed to upload. In this case, we want to exit
1079 # without deleting the objects.
1080 raise CommandException(
1081 'Some temporary components were not uploaded successfully. '
1082 'Please retry this upload.')
1083
1084 elapsed_time = time.time() - start_time
1085 return elapsed_time, composed_object
1086
1087
1088 def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url,
1089 file_size, canned_acl=None):
1090 """Determines whether parallel composite upload strategy should be used.
1091
1092 Args:
1093 logger: for outputting log messages.
1094 allow_splitting: If false, then this function returns false.
1095 src_url: FileUrl corresponding to a local file.
1096 dst_url: CloudUrl corresponding to destination cloud object.
1097 file_size: The size of the source file, in bytes.
1098 canned_acl: Canned ACL to apply to destination object, if any.
1099
1100 Returns:
1101 True iff a parallel upload should be performed on the source file.
1102 """
1103 global suggested_parallel_composites
1104 parallel_composite_upload_threshold = HumanReadableToBytes(config.get(
1105 'GSUtil', 'parallel_composite_upload_threshold',
1106 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD))
1107
1108 all_factors_but_size = (
1109 allow_splitting # Don't split the pieces multiple times.
1110 and not src_url.IsStream() # We can't partition streams.
1111 and dst_url.scheme == 'gs' # Compose is only for gs.
1112 and not canned_acl) # TODO: Implement canned ACL support for compose.
1113
1114 # Since parallel composite uploads are disabled by default, make user aware of
1115 # them.
1116 # TODO: Once compiled crcmod is being distributed by major Linux distributions
1117 # remove this check.
1118 if (all_factors_but_size and parallel_composite_upload_threshold == 0
1119 and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD
1120 and not suggested_parallel_composites):
1121 logger.info('\n'.join(textwrap.wrap(
1122 '==> NOTE: You are uploading one or more large file(s), which would '
1123 'run significantly faster if you enable parallel composite uploads. '
1124 'This feature can be enabled by editing the '
1125 '"parallel_composite_upload_threshold" value in your .boto '
1126 'configuration file. However, note that if you do this you and any '
1127 'users that download such composite files will need to have a compiled '
1128 'crcmod installed (see "gsutil help crcmod").')) + '\n')
1129 suggested_parallel_composites = True
1130
1131 return (all_factors_but_size
1132 and parallel_composite_upload_threshold > 0
1133 and file_size >= parallel_composite_upload_threshold)
1134
1135
1136 def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id):
1137 """Expands wildcard if present in url_str.
1138
1139 Args:
1140 url_str: String representation of requested url.
1141 gsutil_api: gsutil Cloud API instance to use.
1142 debug: debug level to use (for iterators).
1143 project_id: project ID to use (for iterators).
1144
1145 Returns:
1146 (exp_url, have_existing_dst_container)
1147 where exp_url is a StorageUrl
1148 and have_existing_dst_container is a bool indicating whether
1149 exp_url names an existing directory, bucket, or bucket subdirectory.
1150 In the case where we match a subdirectory AND an object, the
1151 object is returned.
1152
1153 Raises:
1154 CommandException: if url_str matched more than 1 URL.
1155 """
1156 # Handle wildcarded url case.
1157 if ContainsWildcard(url_str):
1158 blr_expansion = list(CreateWildcardIterator(url_str, gsutil_api,
1159 debug=debug,
1160 project_id=project_id))
1161 if len(blr_expansion) != 1:
1162 raise CommandException('Destination (%s) must match exactly 1 URL' %
1163 url_str)
1164 blr = blr_expansion[0]
1165 # BLR is either an OBJECT, PREFIX, or BUCKET; the latter two represent
1166 # directories.
1167 return (StorageUrlFromString(blr.url_string), not blr.IsObject())
1168
1169 storage_url = StorageUrlFromString(url_str)
1170
1171 # Handle non-wildcarded url:
1172 if storage_url.IsFileUrl():
1173 return (storage_url, storage_url.IsDirectory())
1174
1175 # At this point we have a cloud URL.
1176 if storage_url.IsBucket():
1177 return (storage_url, True)
1178
1179 # For object/prefix URLs check 3 cases: (a) if the name ends with '/' treat
1180 # as a subdir; otherwise, use the wildcard iterator with url to
1181 # find if (b) there's a Prefix matching url, or (c) name is of form
1182 # dir_$folder$ (and in both these cases also treat dir as a subdir).
1183 # Cloud subdirs are always considered to be an existing container.
1184 if IsCloudSubdirPlaceholder(storage_url):
1185 return (storage_url, True)
1186
1187 # Check for the special case where we have a folder marker object
1188 folder_expansion = CreateWildcardIterator(
1189 url_str + '_$folder$', gsutil_api, debug=debug,
1190 project_id=project_id).IterAll(
1191 bucket_listing_fields=['name'])
1192 for blr in folder_expansion:
1193 return (storage_url, True)
1194
1195 blr_expansion = CreateWildcardIterator(url_str, gsutil_api,
1196 debug=debug,
1197 project_id=project_id).IterAll(
1198 bucket_listing_fields=['name'])
1199 for blr in blr_expansion:
1200 if blr.IsPrefix():
1201 return (storage_url, True)
1202
1203 return (storage_url, False)
1204
1205
1206 def FixWindowsNaming(src_url, dst_url):
1207 """Translates Windows pathnames to cloud pathnames.
1208
1209 Rewrites the destination URL built by ConstructDstUrl().
1210
1211 Args:
1212 src_url: Source StorageUrl to be copied.
1213 dst_url: The destination StorageUrl built by ConstructDstUrl().
1214
1215 Returns:
1216 StorageUrl to use for copy.
1217 """
1218 if (src_url.IsFileUrl() and src_url.delim == '\\'
1219 and dst_url.IsCloudUrl()):
1220 trans_url_str = re.sub(r'\\', '/', dst_url.url_string)
1221 dst_url = StorageUrlFromString(trans_url_str)
1222 return dst_url
1223
1224
1225 def StdinIterator():
1226 """A generator function that returns lines from stdin."""
1227 for line in sys.stdin:
1228 # Strip CRLF.
1229 yield line.rstrip()
1230
1231
1232 def SrcDstSame(src_url, dst_url):
1233 """Checks if src_url and dst_url represent the same object or file.
1234
1235 We don't handle anything about hard or symbolic links.
1236
1237 Args:
1238 src_url: Source StorageUrl.
1239 dst_url: Destination StorageUrl.
1240
1241 Returns:
1242 Bool indicator.
1243 """
1244 if src_url.IsFileUrl() and dst_url.IsFileUrl():
1245 # Translate a/b/./c to a/b/c, so src=dst comparison below works.
1246 new_src_path = os.path.normpath(src_url.object_name)
1247 new_dst_path = os.path.normpath(dst_url.object_name)
1248 return new_src_path == new_dst_path
1249 else:
1250 return (src_url.url_string == dst_url.url_string and
1251 src_url.generation == dst_url.generation)
1252
1253
1254 class _HaltingCopyCallbackHandler(object):
1255 """Test callback handler for intentionally stopping a resumable transfer."""
1256
1257 def __init__(self, is_upload, display_url, halt_at_byte, logger):
1258 self.is_upload = is_upload
1259 self.display_url = display_url
1260 self.halt_at_byte = halt_at_byte
1261 self.logger = logger
1262
1263 # pylint: disable=invalid-name
1264 def call(self, total_bytes_transferred, total_size):
1265 """Forcibly exits if the transfer has passed the halting point."""
1266 if total_bytes_transferred >= self.halt_at_byte:
1267 if self.logger.isEnabledFor(logging.INFO):
1268 sys.stderr.write(
1269 'Halting transfer after byte %s. %s/%s transferred.\r\n' % (
1270 self.halt_at_byte, MakeHumanReadable(total_bytes_transferred),
1271 MakeHumanReadable(total_size)))
1272 if self.is_upload:
1273 raise ResumableUploadException('Artifically halting upload.')
1274 else:
1275 raise ResumableDownloadException('Artifically halting download.')
1276
1277
1278 def _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata):
1279 """Logs copy operation, including Content-Type if appropriate.
1280
1281 Args:
1282 logger: logger instance to use for output.
1283 src_url: Source StorageUrl.
1284 dst_url: Destination StorageUrl.
1285 dst_obj_metadata: Object-specific metadata that should be overidden during
1286 the copy.
1287 """
1288 if (dst_url.IsCloudUrl() and dst_obj_metadata and
1289 dst_obj_metadata.contentType):
1290 content_type_msg = ' [Content-Type=%s]' % dst_obj_metadata.contentType
1291 else:
1292 content_type_msg = ''
1293 if src_url.IsFileUrl() and src_url.IsStream():
1294 logger.info('Copying from <STDIN>%s...', content_type_msg)
1295 else:
1296 logger.info('Copying %s%s...', src_url.url_string, content_type_msg)
1297
1298
1299 # pylint: disable=undefined-variable
1300 def _CopyObjToObjInTheCloud(src_url, src_obj_size, dst_url,
1301 dst_obj_metadata, preconditions, gsutil_api):
1302 """Performs copy-in-the cloud from specified src to dest object.
1303
1304 Args:
1305 src_url: Source CloudUrl.
1306 src_obj_size: Size of source object.
1307 dst_url: Destination CloudUrl.
1308 dst_obj_metadata: Object-specific metadata that should be overidden during
1309 the copy.
1310 preconditions: Preconditions to use for the copy.
1311 gsutil_api: gsutil Cloud API instance to use for the copy.
1312
1313 Returns:
1314 (elapsed_time, bytes_transferred, dst_url with generation,
1315 md5 hash of destination) excluding overhead like initial GET.
1316
1317 Raises:
1318 CommandException: if errors encountered.
1319 """
1320 start_time = time.time()
1321
1322 dst_obj = gsutil_api.CopyObject(
1323 src_url.bucket_name, src_url.object_name,
1324 src_generation=src_url.generation, dst_obj_metadata=dst_obj_metadata,
1325 canned_acl=global_copy_helper_opts.canned_acl,
1326 preconditions=preconditions, provider=dst_url.scheme,
1327 fields=UPLOAD_RETURN_FIELDS)
1328
1329 end_time = time.time()
1330
1331 result_url = dst_url.Clone()
1332 result_url.generation = GenerationFromUrlAndString(result_url,
1333 dst_obj.generation)
1334
1335 return (end_time - start_time, src_obj_size, result_url, dst_obj.md5Hash)
1336
1337
1338 def _CheckFreeSpace(path):
1339 """Return path/drive free space (in bytes)."""
1340 if IS_WINDOWS:
1341 # pylint: disable=g-import-not-at-top
1342 try:
1343 # pylint: disable=invalid-name
1344 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p,
1345 POINTER(c_uint64),
1346 POINTER(c_uint64),
1347 POINTER(c_uint64))
1348 get_disk_free_space_ex = get_disk_free_space_ex(
1349 ('GetDiskFreeSpaceExW', windll.kernel32), (
1350 (1, 'lpszPathName'),
1351 (2, 'lpFreeUserSpace'),
1352 (2, 'lpTotalSpace'),
1353 (2, 'lpFreeSpace'),))
1354 except AttributeError:
1355 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p,
1356 POINTER(c_uint64),
1357 POINTER(c_uint64),
1358 POINTER(c_uint64))
1359 get_disk_free_space_ex = get_disk_free_space_ex(
1360 ('GetDiskFreeSpaceExA', windll.kernel32), (
1361 (1, 'lpszPathName'),
1362 (2, 'lpFreeUserSpace'),
1363 (2, 'lpTotalSpace'),
1364 (2, 'lpFreeSpace'),))
1365
1366 def GetDiskFreeSpaceExErrCheck(result, unused_func, args):
1367 if not result:
1368 raise WinError()
1369 return args[1].value
1370 get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck
1371
1372 return get_disk_free_space_ex(os.getenv('SystemDrive'))
1373 else:
1374 (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path)
1375 return f_frsize * f_bavail
1376
1377
1378 def _SetContentTypeFromFile(src_url, dst_obj_metadata):
1379 """Detects and sets Content-Type if src_url names a local file.
1380
1381 Args:
1382 src_url: Source StorageUrl.
1383 dst_obj_metadata: Object-specific metadata that should be overidden during
1384 the copy.
1385 """
1386 # contentType == '' if user requested default type.
1387 if (dst_obj_metadata.contentType is None and src_url.IsFileUrl()
1388 and not src_url.IsStream()):
1389 # Only do content type recognition if src_url is a file. Object-to-object
1390 # copies with no -h Content-Type specified re-use the content type of the
1391 # source object.
1392 object_name = src_url.object_name
1393 content_type = None
1394 # Streams (denoted by '-') are expected to be 'application/octet-stream'
1395 # and 'file' would partially consume them.
1396 if object_name != '-':
1397 if config.getbool('GSUtil', 'use_magicfile', False):
1398 p = subprocess.Popen(['file', '--mime-type', object_name],
1399 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1400 output, error = p.communicate()
1401 if p.returncode != 0 or error:
1402 raise CommandException(
1403 'Encountered error running "file --mime-type %s" '
1404 '(returncode=%d).\n%s' % (object_name, p.returncode, error))
1405 # Parse output by removing line delimiter and splitting on last ":
1406 content_type = output.rstrip().rpartition(': ')[2]
1407 else:
1408 content_type = mimetypes.guess_type(object_name)[0]
1409 if not content_type:
1410 content_type = DEFAULT_CONTENT_TYPE
1411 dst_obj_metadata.contentType = content_type
1412
1413
1414 # pylint: disable=undefined-variable
1415 def _UploadFileToObjectNonResumable(src_url, src_obj_filestream,
1416 src_obj_size, dst_url, dst_obj_metadata,
1417 preconditions, gsutil_api, logger):
1418 """Uploads the file using a non-resumable strategy.
1419
1420 Args:
1421 src_url: Source StorageUrl to upload.
1422 src_obj_filestream: File pointer to uploadable bytes.
1423 src_obj_size: Size of the source object.
1424 dst_url: Destination StorageUrl for the upload.
1425 dst_obj_metadata: Metadata for the target object.
1426 preconditions: Preconditions for the upload, if any.
1427 gsutil_api: gsutil Cloud API instance to use for the upload.
1428 logger: For outputting log messages.
1429
1430 Returns:
1431 Elapsed upload time, uploaded Object with generation, md5, and size fields
1432 populated.
1433 """
1434 progress_callback = FileProgressCallbackHandler(
1435 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
1436 start_time = time.time()
1437
1438 if src_url.IsStream():
1439 # TODO: gsutil-beta: Provide progress callbacks for streaming uploads.
1440 uploaded_object = gsutil_api.UploadObjectStreaming(
1441 src_obj_filestream, object_metadata=dst_obj_metadata,
1442 canned_acl=global_copy_helper_opts.canned_acl,
1443 preconditions=preconditions, progress_callback=progress_callback,
1444 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
1445 else:
1446 uploaded_object = gsutil_api.UploadObject(
1447 src_obj_filestream, object_metadata=dst_obj_metadata,
1448 canned_acl=global_copy_helper_opts.canned_acl, size=src_obj_size,
1449 preconditions=preconditions, progress_callback=progress_callback,
1450 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
1451 end_time = time.time()
1452 elapsed_time = end_time - start_time
1453
1454 return elapsed_time, uploaded_object
1455
1456
1457 # pylint: disable=undefined-variable
1458 def _UploadFileToObjectResumable(src_url, src_obj_filestream,
1459 src_obj_size, dst_url, dst_obj_metadata,
1460 preconditions, gsutil_api, logger):
1461 """Uploads the file using a resumable strategy.
1462
1463 Args:
1464 src_url: Source FileUrl to upload. Must not be a stream.
1465 src_obj_filestream: File pointer to uploadable bytes.
1466 src_obj_size: Size of the source object.
1467 dst_url: Destination StorageUrl for the upload.
1468 dst_obj_metadata: Metadata for the target object.
1469 preconditions: Preconditions for the upload, if any.
1470 gsutil_api: gsutil Cloud API instance to use for the upload.
1471 logger: for outputting log messages.
1472
1473 Returns:
1474 Elapsed upload time, uploaded Object with generation, md5, and size fields
1475 populated.
1476 """
1477 tracker_file_name = GetTrackerFilePath(
1478 dst_url, TrackerFileType.UPLOAD,
1479 gsutil_api.GetApiSelector(provider=dst_url.scheme))
1480
1481 def _UploadTrackerCallback(serialization_data):
1482 """Creates a new tracker file for starting an upload from scratch.
1483
1484 This function is called by the gsutil Cloud API implementation and the
1485 the serialization data is implementation-specific.
1486
1487 Args:
1488 serialization_data: Serialization data used in resuming the upload.
1489 """
1490 tracker_file = None
1491 try:
1492 tracker_file = open(tracker_file_name, 'w')
1493 tracker_file.write(str(serialization_data))
1494 except IOError as e:
1495 raise CommandException(TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT %
1496 (tracker_file_name, e.strerror))
1497 finally:
1498 if tracker_file:
1499 tracker_file.close()
1500
1501 # This contains the upload URL, which will uniquely identify the
1502 # destination object.
1503 tracker_data = _GetUploadTrackerData(tracker_file_name, logger)
1504 if tracker_data:
1505 logger.info(
1506 'Resuming upload for %s', src_url.url_string)
1507
1508 retryable = True
1509
1510 progress_callback = FileProgressCallbackHandler(
1511 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
1512 if global_copy_helper_opts.halt_at_byte:
1513 progress_callback = _HaltingCopyCallbackHandler(
1514 True, dst_url, global_copy_helper_opts.halt_at_byte, logger).call
1515
1516 start_time = time.time()
1517 try:
1518 uploaded_object = gsutil_api.UploadObjectResumable(
1519 src_obj_filestream, object_metadata=dst_obj_metadata,
1520 canned_acl=global_copy_helper_opts.canned_acl,
1521 preconditions=preconditions, provider=dst_url.scheme,
1522 size=src_obj_size, serialization_data=tracker_data,
1523 fields=UPLOAD_RETURN_FIELDS,
1524 tracker_callback=_UploadTrackerCallback,
1525 progress_callback=progress_callback)
1526 retryable = False
1527 except ResumableUploadAbortException:
1528 retryable = False
1529 raise
1530 finally:
1531 if not retryable:
1532 _DeleteTrackerFile(tracker_file_name)
1533
1534 end_time = time.time()
1535 elapsed_time = end_time - start_time
1536
1537 return (elapsed_time, uploaded_object)
1538
1539
1540 def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger):
1541 """Compresses a to-be-uploaded local file to save bandwidth.
1542
1543 Args:
1544 src_url: Source FileUrl.
1545 src_obj_filestream: Read stream of the source file - will be consumed
1546 and closed.
1547 src_obj_size: Size of the source file.
1548 logger: for outputting log messages.
1549
1550 Returns:
1551 StorageUrl path to compressed file, compressed file size.
1552 """
1553 # TODO: Compress using a streaming model as opposed to all at once here.
1554 if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING:
1555 logger.info(
1556 'Compressing %s (to tmp)...', src_url)
1557 (gzip_fh, gzip_path) = tempfile.mkstemp()
1558 gzip_fp = None
1559 try:
1560 # Check for temp space. Assume the compressed object is at most 2x
1561 # the size of the object (normally should compress to smaller than
1562 # the object)
1563 if _CheckFreeSpace(gzip_path) < 2*int(src_obj_size):
1564 raise CommandException('Inadequate temp space available to compress '
1565 '%s. See the CHANGING TEMP DIRECTORIES section '
1566 'of "gsutil help cp" for more info.' % src_url)
1567 gzip_fp = gzip.open(gzip_path, 'wb')
1568 data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
1569 while data:
1570 gzip_fp.write(data)
1571 data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
1572 finally:
1573 if gzip_fp:
1574 gzip_fp.close()
1575 os.close(gzip_fh)
1576 src_obj_filestream.close()
1577 gzip_size = os.path.getsize(gzip_path)
1578 return StorageUrlFromString(gzip_path), gzip_size
1579
1580
1581 def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size,
1582 dst_url, dst_obj_metadata, preconditions, gsutil_api,
1583 logger, command_obj, copy_exception_handler,
1584 gzip_exts=None, allow_splitting=True):
1585 """Uploads a local file to an object.
1586
1587 Args:
1588 src_url: Source FileUrl.
1589 src_obj_filestream: Read stream of the source file to be read and closed.
1590 src_obj_size: Size of the source file.
1591 dst_url: Destination CloudUrl.
1592 dst_obj_metadata: Metadata to be applied to the destination object.
1593 preconditions: Preconditions to use for the copy.
1594 gsutil_api: gsutil Cloud API to use for the copy.
1595 logger: for outputting log messages.
1596 command_obj: command object for use in Apply in parallel composite uploads.
1597 copy_exception_handler: For handling copy exceptions during Apply.
1598 gzip_exts: List of file extensions to gzip prior to upload, if any.
1599 allow_splitting: Whether to allow the file to be split into component
1600 pieces for an parallel composite upload.
1601
1602 Returns:
1603 (elapsed_time, bytes_transferred, dst_url with generation,
1604 md5 hash of destination) excluding overhead like initial GET.
1605
1606 Raises:
1607 CommandException: if errors encountered.
1608 """
1609 if not dst_obj_metadata or not dst_obj_metadata.contentLanguage:
1610 content_language = config.get_value('GSUtil', 'content_language')
1611 if content_language:
1612 dst_obj_metadata.contentLanguage = content_language
1613
1614 fname_parts = src_url.object_name.split('.')
1615 upload_url = src_url
1616 upload_stream = src_obj_filestream
1617 upload_size = src_obj_size
1618 zipped_file = False
1619 if gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts:
1620 upload_url, upload_size = _CompressFileForUpload(
1621 src_url, src_obj_filestream, src_obj_size, logger)
1622 upload_stream = open(upload_url.object_name, 'rb')
1623 dst_obj_metadata.contentEncoding = 'gzip'
1624 zipped_file = True
1625
1626 elapsed_time = None
1627 uploaded_object = None
1628 hash_algs = GetUploadHashAlgs()
1629 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
1630
1631 parallel_composite_upload = _ShouldDoParallelCompositeUpload(
1632 logger, allow_splitting, upload_url, dst_url, src_obj_size,
1633 canned_acl=global_copy_helper_opts.canned_acl)
1634
1635 if not parallel_composite_upload and len(hash_algs):
1636 # Parallel composite uploads calculate hashes per-component in subsequent
1637 # calls to this function, but the composition of the final object is a
1638 # cloud-only operation.
1639 wrapped_filestream = HashingFileUploadWrapper(upload_stream, digesters,
1640 hash_algs, upload_url, logger)
1641 else:
1642 wrapped_filestream = upload_stream
1643
1644 try:
1645 if parallel_composite_upload:
1646 elapsed_time, uploaded_object = _DoParallelCompositeUpload(
1647 upload_stream, upload_url, dst_url, dst_obj_metadata,
1648 global_copy_helper_opts.canned_acl, upload_size, preconditions,
1649 gsutil_api, command_obj, copy_exception_handler)
1650 elif upload_size < ResumableThreshold() or src_url.IsStream():
1651 elapsed_time, uploaded_object = _UploadFileToObjectNonResumable(
1652 upload_url, wrapped_filestream, upload_size, dst_url,
1653 dst_obj_metadata, preconditions, gsutil_api, logger)
1654 else:
1655 elapsed_time, uploaded_object = _UploadFileToObjectResumable(
1656 upload_url, wrapped_filestream, upload_size, dst_url,
1657 dst_obj_metadata, preconditions, gsutil_api, logger)
1658
1659 finally:
1660 if zipped_file:
1661 try:
1662 os.unlink(upload_url.object_name)
1663 # Windows sometimes complains the temp file is locked when you try to
1664 # delete it.
1665 except Exception: # pylint: disable=broad-except
1666 logger.warning(
1667 'Could not delete %s. This can occur in Windows because the '
1668 'temporary file is still locked.', upload_url.object_name)
1669 # In the gzip case, this is the gzip stream. _CompressFileForUpload will
1670 # have already closed the original source stream.
1671 upload_stream.close()
1672
1673 if not parallel_composite_upload:
1674 try:
1675 digests = _CreateDigestsFromDigesters(digesters)
1676 _CheckHashes(logger, dst_url, uploaded_object, src_url.object_name,
1677 digests, is_upload=True)
1678 except CommandException, e:
1679 # If the digest doesn't match, delete the object.
1680 if 'doesn\'t match cloud-supplied digest' in str(e):
1681 gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name,
1682 generation=uploaded_object.generation,
1683 provider=dst_url.scheme)
1684 raise
1685
1686 result_url = dst_url.Clone()
1687
1688 result_url.generation = uploaded_object.generation
1689 result_url.generation = GenerationFromUrlAndString(
1690 result_url, uploaded_object.generation)
1691
1692 return (elapsed_time, uploaded_object.size, result_url,
1693 uploaded_object.md5Hash)
1694
1695
1696 # TODO: Refactor this long function into smaller pieces.
1697 # pylint: disable=too-many-statements
1698 def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
1699 gsutil_api, logger, test_method=None):
1700 """Downloads an object to a local file.
1701
1702 Args:
1703 src_url: Source CloudUrl.
1704 src_obj_metadata: Metadata from the source object.
1705 dst_url: Destination FileUrl.
1706 gsutil_api: gsutil Cloud API instance to use for the download.
1707 logger: for outputting log messages.
1708 test_method: Optional test method for modifying the file before validation
1709 during unit tests.
1710 Returns:
1711 (elapsed_time, bytes_transferred, dst_url, md5), excluding overhead like
1712 initial GET.
1713
1714 Raises:
1715 CommandException: if errors encountered.
1716 """
1717 file_name = dst_url.object_name
1718 dir_name = os.path.dirname(file_name)
1719 if dir_name and not os.path.exists(dir_name):
1720 # Do dir creation in try block so can ignore case where dir already
1721 # exists. This is needed to avoid a race condition when running gsutil
1722 # -m cp.
1723 try:
1724 os.makedirs(dir_name)
1725 except OSError, e:
1726 if e.errno != errno.EEXIST:
1727 raise
1728 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
1729 # For gzipped objects download to a temp file and unzip. For the XML API,
1730 # the represents the result of a HEAD request. For the JSON API, this is
1731 # the stored encoding which the service may not respect. However, if the
1732 # server sends decompressed bytes for a file that is stored compressed
1733 # (double compressed case), there is no way we can validate the hash and
1734 # we will fail our hash check for the object.
1735 if (src_obj_metadata.contentEncoding and
1736 src_obj_metadata.contentEncoding.lower().endswith('gzip')):
1737 # We can't use tempfile.mkstemp() here because we need a predictable
1738 # filename for resumable downloads.
1739 download_file_name = _GetDownloadZipFileName(file_name)
1740 logger.info(
1741 'Downloading to temp gzip filename %s', download_file_name)
1742 need_to_unzip = True
1743 else:
1744 download_file_name = file_name
1745 need_to_unzip = False
1746
1747 if download_file_name.endswith(dst_url.delim):
1748 logger.warn('\n'.join(textwrap.wrap(
1749 'Skipping attempt to download to filename ending with slash (%s). This '
1750 'typically happens when using gsutil to download from a subdirectory '
1751 'created by the Cloud Console (https://cloud.google.com/console)'
1752 % download_file_name)))
1753 return (0, 0, dst_url, '')
1754
1755 # Set up hash digesters.
1756 hash_algs = GetDownloadHashAlgs(
1757 logger, src_has_md5=src_obj_metadata.md5Hash,
1758 src_has_crc32c=src_obj_metadata.crc32c)
1759 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
1760
1761 fp = None
1762 # Tracks whether the server used a gzip encoding.
1763 server_encoding = None
1764 download_complete = False
1765 download_strategy = _SelectDownloadStrategy(src_obj_metadata, dst_url)
1766 download_start_point = 0
1767 # This is used for resuming downloads, but also for passing the mediaLink
1768 # and size into the download for new downloads so that we can avoid
1769 # making an extra HTTP call.
1770 serialization_data = None
1771 serialization_dict = GetDownloadSerializationDict(src_obj_metadata)
1772 try:
1773 if download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
1774 fp = open(download_file_name, 'wb')
1775 elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
1776 # If this is a resumable download, we need to open the file for append and
1777 # manage a tracker file.
1778 fp = open(download_file_name, 'ab')
1779
1780 resuming = _ReadOrCreateDownloadTrackerFile(
1781 src_obj_metadata, dst_url, api_selector)
1782 if resuming:
1783 # Find out how far along we are so we can request the appropriate
1784 # remaining range of the object.
1785 existing_file_size = GetFileSize(fp, position_to_eof=True)
1786 if existing_file_size > src_obj_metadata.size:
1787 _DeleteTrackerFile(GetTrackerFilePath(
1788 dst_url, TrackerFileType.DOWNLOAD, api_selector))
1789 raise CommandException(
1790 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so '
1791 'if you re-try this download it will start from scratch' %
1792 (fp.name, existing_file_size, src_url.object_name,
1793 src_obj_metadata.size))
1794 else:
1795 if existing_file_size == src_obj_metadata.size:
1796 logger.info(
1797 'Download already complete for file %s, skipping download but '
1798 'will run integrity checks.', download_file_name)
1799 download_complete = True
1800 else:
1801 download_start_point = existing_file_size
1802 serialization_dict['progress'] = download_start_point
1803 logger.info('Resuming download for %s', src_url.url_string)
1804 # Catch up our digester with the hash data.
1805 if existing_file_size > TEN_MB:
1806 for alg_name in digesters:
1807 logger.info(
1808 'Catching up %s for %s', alg_name, download_file_name)
1809 with open(download_file_name, 'rb') as hash_fp:
1810 while True:
1811 data = hash_fp.read(DEFAULT_FILE_BUFFER_SIZE)
1812 if not data:
1813 break
1814 for alg_name in digesters:
1815 digesters[alg_name].update(data)
1816 else:
1817 # Starting a new download, blow away whatever is already there.
1818 fp.truncate(0)
1819
1820 else:
1821 raise CommandException('Invalid download strategy %s chosen for'
1822 'file %s' % (download_strategy, fp.name))
1823
1824 if not dst_url.IsStream():
1825 serialization_data = json.dumps(serialization_dict)
1826
1827 progress_callback = FileProgressCallbackHandler(
1828 ConstructAnnounceText('Downloading', dst_url.url_string),
1829 logger).call
1830 if global_copy_helper_opts.halt_at_byte:
1831 progress_callback = _HaltingCopyCallbackHandler(
1832 False, dst_url, global_copy_helper_opts.halt_at_byte, logger).call
1833
1834 start_time = time.time()
1835 # TODO: With gzip encoding (which may occur on-the-fly and not be part of
1836 # the object's metadata), when we request a range to resume, it's possible
1837 # that the server will just resend the entire object, which means our
1838 # caught-up hash will be incorrect. We recalculate the hash on
1839 # the local file in the case of a failed gzip hash anyway, but it would
1840 # be better if we actively detected this case.
1841 if not download_complete:
1842 server_encoding = gsutil_api.GetObjectMedia(
1843 src_url.bucket_name, src_url.object_name, fp,
1844 start_byte=download_start_point, generation=src_url.generation,
1845 object_size=src_obj_metadata.size,
1846 download_strategy=download_strategy, provider=src_url.scheme,
1847 serialization_data=serialization_data, digesters=digesters,
1848 progress_callback=progress_callback)
1849
1850 end_time = time.time()
1851
1852 # If a custom test method is defined, call it here. For the copy command,
1853 # test methods are expected to take one argument: an open file pointer,
1854 # and are used to perturb the open file during download to exercise
1855 # download error detection.
1856 if test_method:
1857 test_method(fp)
1858 except ResumableDownloadException as e:
1859 logger.warning('Caught ResumableDownloadException (%s) for file %s.',
1860 e.reason, file_name)
1861 raise
1862 finally:
1863 if fp:
1864 fp.close()
1865
1866 # If we decompressed a content-encoding gzip file on the fly, this may not
1867 # be accurate, but it is the best we can do without going deep into the
1868 # underlying HTTP libraries. Note that this value is only used for
1869 # reporting in log messages; inaccuracy doesn't impact the integrity of the
1870 # download.
1871 bytes_transferred = src_obj_metadata.size - download_start_point
1872 server_gzip = server_encoding and server_encoding.lower().endswith('gzip')
1873 local_md5 = _ValidateDownloadHashes(logger, src_url, src_obj_metadata,
1874 dst_url, need_to_unzip, server_gzip,
1875 digesters, hash_algs, api_selector,
1876 bytes_transferred)
1877
1878 return (end_time - start_time, bytes_transferred, dst_url, local_md5)
1879
1880
1881 def _GetDownloadZipFileName(file_name):
1882 """Returns the file name for a temporarily compressed downloaded file."""
1883 return '%s_.gztmp' % file_name
1884
1885
1886 def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url,
1887 need_to_unzip, server_gzip, digesters, hash_algs,
1888 api_selector, bytes_transferred):
1889 """Validates a downloaded file's integrity.
1890
1891 Args:
1892 logger: For outputting log messages.
1893 src_url: StorageUrl for the source object.
1894 src_obj_metadata: Metadata for the source object, potentially containing
1895 hash values.
1896 dst_url: StorageUrl describing the destination file.
1897 need_to_unzip: If true, a temporary zip file was used and must be
1898 uncompressed as part of validation.
1899 server_gzip: If true, the server gzipped the bytes (regardless of whether
1900 the object metadata claimed it was gzipped).
1901 digesters: dict of {string, hash digester} that contains up-to-date digests
1902 computed during the download. If a digester for a particular
1903 algorithm is None, an up-to-date digest is not available and the
1904 hash must be recomputed from the local file.
1905 hash_algs: dict of {string, hash algorithm} that can be used if digesters
1906 don't have up-to-date digests.
1907 api_selector: The Cloud API implementation used (used tracker file naming).
1908 bytes_transferred: Number of bytes downloaded (used for logging).
1909
1910 Returns:
1911 An MD5 of the local file, if one was calculated as part of the integrity
1912 check.
1913 """
1914 file_name = dst_url.object_name
1915 download_file_name = (_GetDownloadZipFileName(file_name) if need_to_unzip else
1916 file_name)
1917 digesters_succeeded = True
1918 for alg in digesters:
1919 # If we get a digester with a None algorithm, the underlying
1920 # implementation failed to calculate a digest, so we will need to
1921 # calculate one from scratch.
1922 if not digesters[alg]:
1923 digesters_succeeded = False
1924 break
1925
1926 if digesters_succeeded:
1927 local_hashes = _CreateDigestsFromDigesters(digesters)
1928 else:
1929 local_hashes = _CreateDigestsFromLocalFile(
1930 logger, hash_algs, download_file_name, src_obj_metadata)
1931
1932 digest_verified = True
1933 hash_invalid_exception = None
1934 try:
1935 _CheckHashes(logger, src_url, src_obj_metadata, download_file_name,
1936 local_hashes)
1937 _DeleteTrackerFile(GetTrackerFilePath(
1938 dst_url, TrackerFileType.DOWNLOAD, api_selector))
1939 except CommandException, e:
1940 # If an non-gzipped object gets sent with gzip content encoding, the hash
1941 # we calculate will match the gzipped bytes, not the original object. Thus,
1942 # we'll need to calculate and check it after unzipping.
1943 if ('doesn\'t match cloud-supplied digest' in str(e) and
1944 (server_gzip or api_selector == ApiSelector.XML)):
1945 if server_gzip:
1946 logger.debug(
1947 'Hash did not match but server gzipped the content, will '
1948 'recalculate.')
1949 else:
1950 logger.debug(
1951 'Hash did not match but server may have gzipped the content, will '
1952 'recalculate.')
1953 # Save off the exception in case this isn't a gzipped file.
1954 hash_invalid_exception = e
1955 digest_verified = False
1956 else:
1957 _DeleteTrackerFile(GetTrackerFilePath(
1958 dst_url, TrackerFileType.DOWNLOAD, api_selector))
1959 os.unlink(file_name)
1960 raise
1961
1962 if server_gzip and not need_to_unzip:
1963 # Server compressed bytes on-the-fly, thus we need to rename and decompress.
1964 # We can't decompress on-the-fly because prior to Python 3.2 the gzip
1965 # module makes a bunch of seek calls on the stream.
1966 download_file_name = _GetDownloadZipFileName(file_name)
1967 os.rename(file_name, download_file_name)
1968
1969 if need_to_unzip or server_gzip:
1970 # Log that we're uncompressing if the file is big enough that
1971 # decompressing would make it look like the transfer "stalled" at the end.
1972 if bytes_transferred > TEN_MB:
1973 logger.info(
1974 'Uncompressing downloaded tmp file to %s...', file_name)
1975
1976 # Downloaded gzipped file to a filename w/o .gz extension, so unzip.
1977 gzip_fp = None
1978 try:
1979 gzip_fp = gzip.open(download_file_name, 'rb')
1980 with open(file_name, 'wb') as f_out:
1981 data = gzip_fp.read(GZIP_CHUNK_SIZE)
1982 while data:
1983 f_out.write(data)
1984 data = gzip_fp.read(GZIP_CHUNK_SIZE)
1985 except IOError, e:
1986 # In the XML case where we don't know if the file was gzipped, raise
1987 # the original hash exception if we find that it wasn't.
1988 if 'Not a gzipped file' in str(e) and hash_invalid_exception:
1989 # Linter improperly thinks we're raising None despite the above check.
1990 # pylint: disable=raising-bad-type
1991 raise hash_invalid_exception
1992 finally:
1993 if gzip_fp:
1994 gzip_fp.close()
1995
1996 os.unlink(download_file_name)
1997
1998 if not digest_verified:
1999 try:
2000 # Recalculate hashes on the unzipped local file.
2001 local_hashes = _CreateDigestsFromLocalFile(logger, hash_algs, file_name,
2002 src_obj_metadata)
2003 _CheckHashes(logger, src_url, src_obj_metadata, file_name, local_hashes)
2004 _DeleteTrackerFile(GetTrackerFilePath(
2005 dst_url, TrackerFileType.DOWNLOAD, api_selector))
2006 except CommandException, e:
2007 _DeleteTrackerFile(GetTrackerFilePath(
2008 dst_url, TrackerFileType.DOWNLOAD, api_selector))
2009 os.unlink(file_name)
2010 raise
2011
2012 if 'md5' in local_hashes:
2013 return local_hashes['md5']
2014
2015
2016 def _CopyFileToFile(src_url, dst_url):
2017 """Copies a local file to a local file.
2018
2019 Args:
2020 src_url: Source FileUrl.
2021 dst_url: Destination FileUrl.
2022 Returns:
2023 (elapsed_time, bytes_transferred, dst_url, md5=None).
2024
2025 Raises:
2026 CommandException: if errors encountered.
2027 """
2028 src_fp = GetStreamFromFileUrl(src_url)
2029 dir_name = os.path.dirname(dst_url.object_name)
2030 if dir_name and not os.path.exists(dir_name):
2031 os.makedirs(dir_name)
2032 dst_fp = open(dst_url.object_name, 'wb')
2033 start_time = time.time()
2034 shutil.copyfileobj(src_fp, dst_fp)
2035 end_time = time.time()
2036 return (end_time - start_time, os.path.getsize(dst_url.object_name),
2037 dst_url, None)
2038
2039
2040 def _DummyTrackerCallback(_):
2041 pass
2042
2043
2044 # pylint: disable=undefined-variable
2045 def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url,
2046 dst_obj_metadata, preconditions, gsutil_api,
2047 logger):
2048 """Copies from src_url to dst_url in "daisy chain" mode.
2049
2050 See -D OPTION documentation about what daisy chain mode is.
2051
2052 Args:
2053 src_url: Source CloudUrl
2054 src_obj_metadata: Metadata from source object
2055 dst_url: Destination CloudUrl
2056 dst_obj_metadata: Object-specific metadata that should be overidden during
2057 the copy.
2058 preconditions: Preconditions to use for the copy.
2059 gsutil_api: gsutil Cloud API to use for the copy.
2060 logger: For outputting log messages.
2061
2062 Returns:
2063 (elapsed_time, bytes_transferred, dst_url with generation,
2064 md5 hash of destination) excluding overhead like initial GET.
2065
2066 Raises:
2067 CommandException: if errors encountered.
2068 """
2069 # We don't attempt to preserve ACLs across providers because
2070 # GCS and S3 support different ACLs and disjoint principals.
2071 if (global_copy_helper_opts.preserve_acl
2072 and src_url.scheme != dst_url.scheme):
2073 raise NotImplementedError(
2074 'Cross-provider cp -p not supported')
2075 if not global_copy_helper_opts.preserve_acl:
2076 dst_obj_metadata.acl = []
2077
2078 start_time = time.time()
2079 upload_fp = DaisyChainWrapper(src_url, src_obj_metadata.size, gsutil_api)
2080 if src_obj_metadata.size == 0:
2081 # Resumable uploads of size 0 are not supported.
2082 uploaded_object = gsutil_api.UploadObject(
2083 upload_fp, object_metadata=dst_obj_metadata,
2084 canned_acl=global_copy_helper_opts.canned_acl,
2085 preconditions=preconditions, provider=dst_url.scheme,
2086 fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size)
2087 else:
2088 # TODO: Support process-break resumes. This will resume across connection
2089 # breaks and server errors, but the tracker callback is a no-op so this
2090 # won't resume across gsutil runs.
2091 uploaded_object = gsutil_api.UploadObjectResumable(
2092 upload_fp, object_metadata=dst_obj_metadata,
2093 canned_acl=global_copy_helper_opts.canned_acl,
2094 preconditions=preconditions, provider=dst_url.scheme,
2095 fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size,
2096 progress_callback=FileProgressCallbackHandler(
2097 ConstructAnnounceText('Uploading', dst_url.url_string),
2098 logger).call,
2099 tracker_callback=_DummyTrackerCallback)
2100 end_time = time.time()
2101
2102 try:
2103 _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
2104 uploaded_object)
2105 except CommandException, e:
2106 if 'doesn\'t match cloud-supplied digest' in str(e):
2107 gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name,
2108 generation=uploaded_object.generation,
2109 provider=dst_url.scheme)
2110 raise
2111
2112 result_url = dst_url.Clone()
2113 result_url.generation = GenerationFromUrlAndString(
2114 result_url, uploaded_object.generation)
2115
2116 return (end_time - start_time, src_obj_metadata.size, result_url,
2117 uploaded_object.md5Hash)
2118
2119
2120 # pylint: disable=undefined-variable
2121 # pylint: disable=too-many-statements
2122 def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
2123 copy_exception_handler, allow_splitting=True,
2124 headers=None, manifest=None, gzip_exts=None, test_method=None):
2125 """Performs copy from src_url to dst_url, handling various special cases.
2126
2127 Args:
2128 logger: for outputting log messages.
2129 src_url: Source StorageUrl.
2130 dst_url: Destination StorageUrl.
2131 gsutil_api: gsutil Cloud API instance to use for the copy.
2132 command_obj: command object for use in Apply in parallel composite uploads.
2133 copy_exception_handler: for handling copy exceptions during Apply.
2134 allow_splitting: Whether to allow the file to be split into component
2135 pieces for an parallel composite upload.
2136 headers: optional headers to use for the copy operation.
2137 manifest: optional manifest for tracking copy operations.
2138 gzip_exts: List of file extensions to gzip for uploads, if any.
2139 test_method: optional test method for modifying files during unit tests.
2140
2141 Returns:
2142 (elapsed_time, bytes_transferred, version-specific dst_url) excluding
2143 overhead like initial GET.
2144
2145 Raises:
2146 ItemExistsError: if no clobber flag is specified and the destination
2147 object already exists.
2148 CommandException: if other errors encountered.
2149 """
2150 if headers:
2151 dst_obj_headers = headers.copy()
2152 else:
2153 dst_obj_headers = {}
2154
2155 # Create a metadata instance for each destination object so metadata
2156 # such as content-type can be applied per-object.
2157 # Initialize metadata from any headers passed in via -h.
2158 dst_obj_metadata = ObjectMetadataFromHeaders(dst_obj_headers)
2159
2160 if dst_url.IsCloudUrl() and dst_url.scheme == 'gs':
2161 preconditions = PreconditionsFromHeaders(dst_obj_headers)
2162 else:
2163 preconditions = Preconditions()
2164
2165 src_obj_metadata = None
2166 src_obj_filestream = None
2167 if src_url.IsCloudUrl():
2168 src_obj_fields = None
2169 if dst_url.IsCloudUrl():
2170 # For cloud or daisy chain copy, we need every copyable field.
2171 # If we're not modifying or overriding any of the fields, we can get
2172 # away without retrieving the object metadata because the copy
2173 # operation can succeed with just the destination bucket and object
2174 # name. But if we are sending any metadata, the JSON API will expect a
2175 # complete object resource. Since we want metadata like the object size
2176 # for our own tracking, we just get all of the metadata here.
2177 src_obj_fields = ['cacheControl', 'componentCount',
2178 'contentDisposition', 'contentEncoding',
2179 'contentLanguage', 'contentType', 'crc32c',
2180 'etag', 'generation', 'md5Hash', 'mediaLink',
2181 'metadata', 'metageneration', 'size']
2182 # We only need the ACL if we're going to preserve it.
2183 if global_copy_helper_opts.preserve_acl:
2184 src_obj_fields.append('acl')
2185 if (src_url.scheme == dst_url.scheme
2186 and not global_copy_helper_opts.daisy_chain):
2187 copy_in_the_cloud = True
2188 else:
2189 copy_in_the_cloud = False
2190 else:
2191 # Just get the fields needed to validate the download.
2192 src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag',
2193 'mediaLink', 'md5Hash', 'size']
2194 try:
2195 src_generation = GenerationFromUrlAndString(src_url, src_url.generation)
2196 src_obj_metadata = gsutil_api.GetObjectMetadata(
2197 src_url.bucket_name, src_url.object_name,
2198 generation=src_generation, provider=src_url.scheme,
2199 fields=src_obj_fields)
2200 except NotFoundException:
2201 raise CommandException(
2202 'NotFoundException: Could not retrieve source object %s.' %
2203 src_url.url_string)
2204 src_obj_size = src_obj_metadata.size
2205 dst_obj_metadata.contentType = src_obj_metadata.contentType
2206 if global_copy_helper_opts.preserve_acl:
2207 dst_obj_metadata.acl = src_obj_metadata.acl
2208 # Special case for S3-to-S3 copy URLs using
2209 # global_copy_helper_opts.preserve_acl.
2210 # dst_url will be verified in _CopyObjToObjDaisyChainMode if it
2211 # is not s3 (and thus differs from src_url).
2212 if src_url.scheme == 's3':
2213 acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata)
2214 if acl_text:
2215 AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text)
2216 else:
2217 try:
2218 src_obj_filestream = GetStreamFromFileUrl(src_url)
2219 except:
2220 raise CommandException('"%s" does not exist.' % src_url)
2221 if src_url.IsStream():
2222 src_obj_size = None
2223 else:
2224 src_obj_size = os.path.getsize(src_url.object_name)
2225
2226 if global_copy_helper_opts.use_manifest:
2227 # Set the source size in the manifest.
2228 manifest.Set(src_url.url_string, 'size', src_obj_size)
2229
2230 # On Windows, stdin is opened as text mode instead of binary which causes
2231 # problems when piping a binary file, so this switches it to binary mode.
2232 if IS_WINDOWS and src_url.IsFileUrl() and src_url.IsStream():
2233 msvcrt.setmode(GetStreamFromFileUrl(src_url).fileno(), os.O_BINARY)
2234
2235 if global_copy_helper_opts.no_clobber:
2236 # There are two checks to prevent clobbering:
2237 # 1) The first check is to see if the URL
2238 # already exists at the destination and prevent the upload/download
2239 # from happening. This is done by the exists() call.
2240 # 2) The second check is only relevant if we are writing to gs. We can
2241 # enforce that the server only writes the object if it doesn't exist
2242 # by specifying the header below. This check only happens at the
2243 # server after the complete file has been uploaded. We specify this
2244 # header to prevent a race condition where a destination file may
2245 # be created after the first check and before the file is fully
2246 # uploaded.
2247 # In order to save on unnecessary uploads/downloads we perform both
2248 # checks. However, this may come at the cost of additional HTTP calls.
2249 if preconditions.gen_match:
2250 raise ArgumentException('Specifying x-goog-if-generation-match is '
2251 'not supported with cp -n')
2252 else:
2253 preconditions.gen_match = 0
2254 if dst_url.IsFileUrl() and os.path.exists(dst_url.object_name):
2255 # The local file may be a partial. Check the file sizes.
2256 if src_obj_size == os.path.getsize(dst_url.object_name):
2257 raise ItemExistsError()
2258 elif dst_url.IsCloudUrl():
2259 try:
2260 dst_object = gsutil_api.GetObjectMetadata(
2261 dst_url.bucket_name, dst_url.object_name, provider=dst_url.scheme)
2262 except NotFoundException:
2263 dst_object = None
2264 if dst_object:
2265 raise ItemExistsError()
2266
2267 if dst_url.IsCloudUrl():
2268 # Cloud storage API gets object and bucket name from metadata.
2269 dst_obj_metadata.name = dst_url.object_name
2270 dst_obj_metadata.bucket = dst_url.bucket_name
2271 if src_url.IsCloudUrl():
2272 # Preserve relevant metadata from the source object if it's not already
2273 # provided from the headers.
2274 CopyObjectMetadata(src_obj_metadata, dst_obj_metadata, override=False)
2275 else:
2276 _SetContentTypeFromFile(src_url, dst_obj_metadata)
2277 else:
2278 # Files don't have Cloud API metadata.
2279 dst_obj_metadata = None
2280
2281 _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata)
2282
2283 if global_copy_helper_opts.canned_acl:
2284 # No canned ACL support in JSON, force XML API to be used for
2285 # upload/copy operations.
2286 orig_prefer_api = gsutil_api.prefer_api
2287 gsutil_api.prefer_api = ApiSelector.XML
2288
2289 try:
2290 if src_url.IsCloudUrl():
2291 if dst_url.IsFileUrl():
2292 return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
2293 gsutil_api, logger,
2294 test_method=test_method)
2295 elif copy_in_the_cloud:
2296 return _CopyObjToObjInTheCloud(src_url, src_obj_size, dst_url,
2297 dst_obj_metadata, preconditions,
2298 gsutil_api)
2299 else:
2300 return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata,
2301 dst_url, dst_obj_metadata,
2302 preconditions, gsutil_api, logger)
2303 else: # src_url.IsFileUrl()
2304 if dst_url.IsCloudUrl():
2305 return _UploadFileToObject(
2306 src_url, src_obj_filestream, src_obj_size, dst_url,
2307 dst_obj_metadata, preconditions, gsutil_api, logger, command_obj,
2308 copy_exception_handler, gzip_exts=gzip_exts,
2309 allow_splitting=allow_splitting)
2310 else: # dst_url.IsFileUrl()
2311 return _CopyFileToFile(src_url, dst_url)
2312 finally:
2313 if global_copy_helper_opts.canned_acl:
2314 gsutil_api.prefer_api = orig_prefer_api
2315
2316
2317 class Manifest(object):
2318 """Stores the manifest items for the CpCommand class."""
2319
2320 def __init__(self, path):
2321 # self.items contains a dictionary of rows
2322 self.items = {}
2323 self.manifest_filter = {}
2324 self.lock = CreateLock()
2325
2326 self.manifest_path = os.path.expanduser(path)
2327 self._ParseManifest()
2328 self._CreateManifestFile()
2329
2330 def _ParseManifest(self):
2331 """Load and parse a manifest file.
2332
2333 This information will be used to skip any files that have a skip or OK
2334 status.
2335 """
2336 try:
2337 if os.path.exists(self.manifest_path):
2338 with open(self.manifest_path, 'rb') as f:
2339 first_row = True
2340 reader = csv.reader(f)
2341 for row in reader:
2342 if first_row:
2343 try:
2344 source_index = row.index('Source')
2345 result_index = row.index('Result')
2346 except ValueError:
2347 # No header and thus not a valid manifest file.
2348 raise CommandException(
2349 'Missing headers in manifest file: %s' % self.manifest_path)
2350 first_row = False
2351 source = row[source_index]
2352 result = row[result_index]
2353 if result in ['OK', 'skip']:
2354 # We're always guaranteed to take the last result of a specific
2355 # source url.
2356 self.manifest_filter[source] = result
2357 except IOError:
2358 raise CommandException('Could not parse %s' % self.manifest_path)
2359
2360 def WasSuccessful(self, src):
2361 """Returns whether the specified src url was marked as successful."""
2362 return src in self.manifest_filter
2363
2364 def _CreateManifestFile(self):
2365 """Opens the manifest file and assigns it to the file pointer."""
2366 try:
2367 if ((not os.path.exists(self.manifest_path))
2368 or (os.stat(self.manifest_path).st_size == 0)):
2369 # Add headers to the new file.
2370 with open(self.manifest_path, 'wb', 1) as f:
2371 writer = csv.writer(f)
2372 writer.writerow(['Source',
2373 'Destination',
2374 'Start',
2375 'End',
2376 'Md5',
2377 'UploadId',
2378 'Source Size',
2379 'Bytes Transferred',
2380 'Result',
2381 'Description'])
2382 except IOError:
2383 raise CommandException('Could not create manifest file.')
2384
2385 def Set(self, url, key, value):
2386 if value is None:
2387 # In case we don't have any information to set we bail out here.
2388 # This is so that we don't clobber existing information.
2389 # To zero information pass '' instead of None.
2390 return
2391 if url in self.items:
2392 self.items[url][key] = value
2393 else:
2394 self.items[url] = {key: value}
2395
2396 def Initialize(self, source_url, destination_url):
2397 # Always use the source_url as the key for the item. This is unique.
2398 self.Set(source_url, 'source_uri', source_url)
2399 self.Set(source_url, 'destination_uri', destination_url)
2400 self.Set(source_url, 'start_time', datetime.datetime.utcnow())
2401
2402 def SetResult(self, source_url, bytes_transferred, result,
2403 description=''):
2404 self.Set(source_url, 'bytes', bytes_transferred)
2405 self.Set(source_url, 'result', result)
2406 self.Set(source_url, 'description', description)
2407 self.Set(source_url, 'end_time', datetime.datetime.utcnow())
2408 self._WriteRowToManifestFile(source_url)
2409 self._RemoveItemFromManifest(source_url)
2410
2411 def _WriteRowToManifestFile(self, url):
2412 """Writes a manifest entry to the manifest file for the url argument."""
2413 row_item = self.items[url]
2414 data = [
2415 str(row_item['source_uri'].encode(UTF8)),
2416 str(row_item['destination_uri'].encode(UTF8)),
2417 '%sZ' % row_item['start_time'].isoformat(),
2418 '%sZ' % row_item['end_time'].isoformat(),
2419 row_item['md5'] if 'md5' in row_item else '',
2420 row_item['upload_id'] if 'upload_id' in row_item else '',
2421 str(row_item['size']) if 'size' in row_item else '',
2422 str(row_item['bytes']) if 'bytes' in row_item else '',
2423 row_item['result'],
2424 row_item['description'].encode(UTF8)]
2425
2426 # Aquire a lock to prevent multiple threads writing to the same file at
2427 # the same time. This would cause a garbled mess in the manifest file.
2428 with self.lock:
2429 with open(self.manifest_path, 'a', 1) as f: # 1 == line buffered
2430 writer = csv.writer(f)
2431 writer.writerow(data)
2432
2433 def _RemoveItemFromManifest(self, url):
2434 # Remove the item from the dictionary since we're done with it and
2435 # we don't want the dictionary to grow too large in memory for no good
2436 # reason.
2437 del self.items[url]
2438
2439
2440 class ItemExistsError(Exception):
2441 """Exception class for objects that are skipped because they already exist."""
2442 pass
2443
2444
2445 def GetPathBeforeFinalDir(url):
2446 """Returns the path section before the final directory component of the URL.
2447
2448 This handles cases for file system directories, bucket, and bucket
2449 subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket',
2450 and for file://dir we'll return file://
2451
2452 Args:
2453 url: StorageUrl representing a filesystem directory, cloud bucket or
2454 bucket subdir.
2455
2456 Returns:
2457 String name of above-described path, sans final path separator.
2458 """
2459 sep = url.delim
2460 if url.IsFileUrl():
2461 past_scheme = url.url_string[len('file://'):]
2462 if past_scheme.find(sep) == -1:
2463 return 'file://'
2464 else:
2465 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0]
2466 if url.IsBucket():
2467 return '%s://' % url.scheme
2468 # Else it names a bucket subdir.
2469 return url.url_string.rstrip(sep).rpartition(sep)[0]
2470
2471
2472 def _HashFilename(filename):
2473 """Apply a hash function (SHA1) to shorten the passed file name.
2474
2475 The spec for the hashed file name is as follows:
2476
2477 TRACKER_<hash>_<trailing>
2478
2479 where hash is a SHA1 hash on the original file name and trailing is
2480 the last 16 chars from the original file name. Max file name lengths
2481 vary by operating system so the goal of this function is to ensure
2482 the hashed version takes fewer than 100 characters.
2483
2484 Args:
2485 filename: file name to be hashed.
2486
2487 Returns:
2488 shorter, hashed version of passed file name
2489 """
2490 if isinstance(filename, unicode):
2491 filename = filename.encode(UTF8)
2492 else:
2493 filename = unicode(filename, UTF8).encode(UTF8)
2494 m = hashlib.sha1(filename)
2495 return 'TRACKER_' + m.hexdigest() + '.' + filename[-16:]
2496
2497
2498 def _DivideAndCeil(dividend, divisor):
2499 """Returns ceil(dividend / divisor).
2500
2501 Takes care to avoid the pitfalls of floating point arithmetic that could
2502 otherwise yield the wrong result for large numbers.
2503
2504 Args:
2505 dividend: Dividend for the operation.
2506 divisor: Divisor for the operation.
2507
2508 Returns:
2509 Quotient.
2510 """
2511 quotient = dividend // divisor
2512 if (dividend % divisor) != 0:
2513 quotient += 1
2514 return quotient
2515
2516
2517 def _GetPartitionInfo(file_size, max_components, default_component_size):
2518 """Gets info about a file partition for parallel composite uploads.
2519
2520 Args:
2521 file_size: The number of bytes in the file to be partitioned.
2522 max_components: The maximum number of components that can be composed.
2523 default_component_size: The size of a component, assuming that
2524 max_components is infinite.
2525 Returns:
2526 The number of components in the partitioned file, and the size of each
2527 component (except the last, which will have a different size iff
2528 file_size != 0 (mod num_components)).
2529 """
2530 # num_components = ceil(file_size / default_component_size)
2531 num_components = _DivideAndCeil(file_size, default_component_size)
2532
2533 # num_components must be in the range [2, max_components]
2534 num_components = max(min(num_components, max_components), 2)
2535
2536 # component_size = ceil(file_size / num_components)
2537 component_size = _DivideAndCeil(file_size, num_components)
2538 return (num_components, component_size)
2539
2540
2541 def _DeleteObjectFn(cls, url_to_delete, thread_state=None):
2542 """Wrapper function to be used with command.Apply()."""
2543 gsutil_api = GetCloudApiInstance(cls, thread_state)
2544 gsutil_api.DeleteObject(
2545 url_to_delete.bucket_name, url_to_delete.object_name,
2546 generation=url_to_delete.generation, provider=url_to_delete.scheme)
2547
2548
2549 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock):
2550 """Parse the tracker file from the last parallel composite upload attempt.
2551
2552 If it exists, the tracker file is of the format described in
2553 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be
2554 read, then the upload will start from the beginning.
2555
2556 Args:
2557 tracker_file: The name of the file to parse.
2558 tracker_file_lock: Lock protecting access to the tracker file.
2559
2560 Returns:
2561 random_prefix: A randomly-generated prefix to the name of the
2562 temporary components.
2563 existing_objects: A list of ObjectFromTracker objects representing
2564 the set of files that have already been uploaded.
2565 """
2566 existing_objects = []
2567 try:
2568 with tracker_file_lock:
2569 f = open(tracker_file, 'r')
2570 lines = f.readlines()
2571 lines = [line.strip() for line in lines]
2572 f.close()
2573 except IOError as e:
2574 # We can't read the tracker file, so generate a new random prefix.
2575 lines = [str(random.randint(1, (10 ** 10) - 1))]
2576
2577 # Ignore non-existent file (happens first time an upload
2578 # is attempted on a file), but warn user for other errors.
2579 if e.errno != errno.ENOENT:
2580 # Will restart because we failed to read in the file.
2581 print('Couldn\'t read parallel upload tracker file (%s): %s. '
2582 'Restarting upload from scratch.' % (tracker_file, e.strerror))
2583
2584 # The first line contains the randomly-generated prefix.
2585 random_prefix = lines[0]
2586
2587 # The remaining lines were written in pairs to describe a single component
2588 # in the form:
2589 # object_name (without random prefix)
2590 # generation
2591 # Newlines are used as the delimiter because only newlines and carriage
2592 # returns are invalid characters in object names, and users can specify
2593 # a custom prefix in the config file.
2594 i = 1
2595 while i < len(lines):
2596 (name, generation) = (lines[i], lines[i+1])
2597 if not generation:
2598 # Cover the '' case.
2599 generation = None
2600 existing_objects.append(ObjectFromTracker(name, generation))
2601 i += 2
2602 return (random_prefix, existing_objects)
2603
2604
2605 def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component,
2606 tracker_file_lock):
2607 """Appends info about the uploaded component to an existing tracker file.
2608
2609 Follows the format described in _CreateParallelUploadTrackerFile.
2610
2611 Args:
2612 tracker_file: Tracker file to append to.
2613 component: Component that was uploaded.
2614 tracker_file_lock: Thread and process-safe Lock for the tracker file.
2615 """
2616 lines = _GetParallelUploadTrackerFileLinesForComponents([component])
2617 lines = [line + '\n' for line in lines]
2618 with tracker_file_lock:
2619 with open(tracker_file, 'a') as f:
2620 f.writelines(lines)
2621
2622
2623 def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components,
2624 tracker_file_lock):
2625 """Writes information about components that were successfully uploaded.
2626
2627 This way the upload can be resumed at a later date. The tracker file has
2628 the format:
2629 random_prefix
2630 temp_object_1_name
2631 temp_object_1_generation
2632 .
2633 .
2634 .
2635 temp_object_N_name
2636 temp_object_N_generation
2637 where N is the number of components that have been successfully uploaded.
2638
2639 Args:
2640 tracker_file: The name of the parallel upload tracker file.
2641 random_prefix: The randomly-generated prefix that was used for
2642 for uploading any existing components.
2643 components: A list of ObjectFromTracker objects that were uploaded.
2644 tracker_file_lock: The lock protecting access to the tracker file.
2645 """
2646 lines = [random_prefix]
2647 lines += _GetParallelUploadTrackerFileLinesForComponents(components)
2648 lines = [line + '\n' for line in lines]
2649 try:
2650 with tracker_file_lock:
2651 open(tracker_file, 'w').close() # Clear the file.
2652 with open(tracker_file, 'w') as f:
2653 f.writelines(lines)
2654 except IOError as e:
2655 raise CommandException(TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT %
2656 (tracker_file, e.strerror))
2657
2658
2659 def _GetParallelUploadTrackerFileLinesForComponents(components):
2660 """Return a list of the lines for use in a parallel upload tracker file.
2661
2662 The lines represent the given components, using the format as described in
2663 _CreateParallelUploadTrackerFile.
2664
2665 Args:
2666 components: A list of ObjectFromTracker objects that were uploaded.
2667
2668 Returns:
2669 Lines describing components with their generation for outputting to the
2670 tracker file.
2671 """
2672 lines = []
2673 for component in components:
2674 generation = None
2675 generation = component.generation
2676 if not generation:
2677 generation = ''
2678 lines += [component.object_name, str(generation)]
2679 return lines
2680
2681
2682 def FilterExistingComponents(dst_args, existing_components, bucket_url,
2683 gsutil_api):
2684 """Determines course of action for component objects.
2685
2686 Given the list of all target objects based on partitioning the file and
2687 the list of objects that have already been uploaded successfully,
2688 this function determines which objects should be uploaded, which
2689 existing components are still valid, and which existing components should
2690 be deleted.
2691
2692 Args:
2693 dst_args: The map of file_name -> PerformParallelUploadFileToObjectArgs
2694 calculated by partitioning the file.
2695 existing_components: A list of ObjectFromTracker objects that have been
2696 uploaded in the past.
2697 bucket_url: CloudUrl of the bucket in which the components exist.
2698 gsutil_api: gsutil Cloud API instance to use for retrieving object metadata.
2699
2700 Returns:
2701 components_to_upload: List of components that need to be uploaded.
2702 uploaded_components: List of components that have already been
2703 uploaded and are still valid.
2704 existing_objects_to_delete: List of components that have already
2705 been uploaded, but are no longer valid
2706 and are in a versioned bucket, and
2707 therefore should be deleted.
2708 """
2709 components_to_upload = []
2710 existing_component_names = [component.object_name
2711 for component in existing_components]
2712 for component_name in dst_args:
2713 if component_name not in existing_component_names:
2714 components_to_upload.append(dst_args[component_name])
2715
2716 objects_already_chosen = []
2717
2718 # Don't reuse any temporary components whose MD5 doesn't match the current
2719 # MD5 of the corresponding part of the file. If the bucket is versioned,
2720 # also make sure that we delete the existing temporary version.
2721 existing_objects_to_delete = []
2722 uploaded_components = []
2723 for tracker_object in existing_components:
2724 if (tracker_object.object_name not in dst_args.keys()
2725 or tracker_object.object_name in objects_already_chosen):
2726 # This could happen if the component size has changed. This also serves
2727 # to handle object names that get duplicated in the tracker file due
2728 # to people doing things they shouldn't (e.g., overwriting an existing
2729 # temporary component in a versioned bucket).
2730
2731 url = bucket_url.Clone()
2732 url.object_name = tracker_object.object_name
2733 url.generation = tracker_object.generation
2734 existing_objects_to_delete.append(url)
2735 continue
2736
2737 dst_arg = dst_args[tracker_object.object_name]
2738 file_part = FilePart(dst_arg.filename, dst_arg.file_start,
2739 dst_arg.file_length)
2740 # TODO: calculate MD5's in parallel when possible.
2741 content_md5 = CalculateB64EncodedMd5FromContents(file_part)
2742
2743 try:
2744 # Get the MD5 of the currently-existing component.
2745 dst_url = dst_arg.dst_url
2746 dst_metadata = gsutil_api.GetObjectMetadata(
2747 dst_url.bucket_name, dst_url.object_name,
2748 generation=dst_url.generation, provider=dst_url.scheme,
2749 fields=['md5Hash', 'etag'])
2750 cloud_md5 = dst_metadata.md5Hash
2751 except Exception: # pylint: disable=broad-except
2752 # We don't actually care what went wrong - we couldn't retrieve the
2753 # object to check the MD5, so just upload it again.
2754 cloud_md5 = None
2755
2756 if cloud_md5 != content_md5:
2757 components_to_upload.append(dst_arg)
2758 objects_already_chosen.append(tracker_object.object_name)
2759 if tracker_object.generation:
2760 # If the old object doesn't have a generation (i.e., it isn't in a
2761 # versioned bucket), then we will just overwrite it anyway.
2762 invalid_component_with_generation = dst_arg.dst_url.Clone()
2763 invalid_component_with_generation.generation = tracker_object.generation
2764 existing_objects_to_delete.append(invalid_component_with_generation)
2765 else:
2766 url = dst_arg.dst_url.Clone()
2767 url.generation = tracker_object.generation
2768 uploaded_components.append(url)
2769 objects_already_chosen.append(tracker_object.object_name)
2770
2771 if uploaded_components:
2772 logging.info('Found %d existing temporary components to reuse.',
2773 len(uploaded_components))
2774
2775 return (components_to_upload, uploaded_components,
2776 existing_objects_to_delete)
OLDNEW
« no previous file with comments | « gslib/commands/web.py ('k') | gslib/cred_types.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698