| OLD | NEW |
| (Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright 2011 Google Inc. All Rights Reserved. |
| 3 # Copyright 2011, Nexenta Systems Inc. |
| 4 # |
| 5 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 # you may not use this file except in compliance with the License. |
| 7 # You may obtain a copy of the License at |
| 8 # |
| 9 # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 # |
| 11 # Unless required by applicable law or agreed to in writing, software |
| 12 # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 # See the License for the specific language governing permissions and |
| 15 # limitations under the License. |
| 16 """Helper functions for copy functionality.""" |
| 17 |
| 18 from __future__ import absolute_import |
| 19 |
| 20 import base64 |
| 21 from collections import namedtuple |
| 22 import csv |
| 23 import datetime |
| 24 import errno |
| 25 import gzip |
| 26 import hashlib |
| 27 from hashlib import md5 |
| 28 import json |
| 29 import logging |
| 30 import mimetypes |
| 31 import os |
| 32 import random |
| 33 import re |
| 34 import shutil |
| 35 import stat |
| 36 import subprocess |
| 37 import sys |
| 38 import tempfile |
| 39 import textwrap |
| 40 import time |
| 41 import traceback |
| 42 |
| 43 from boto import config |
| 44 import crcmod |
| 45 |
| 46 import gslib |
| 47 from gslib.cloud_api import ArgumentException |
| 48 from gslib.cloud_api import CloudApi |
| 49 from gslib.cloud_api import NotFoundException |
| 50 from gslib.cloud_api import PreconditionException |
| 51 from gslib.cloud_api import Preconditions |
| 52 from gslib.cloud_api import ResumableDownloadException |
| 53 from gslib.cloud_api import ResumableUploadAbortException |
| 54 from gslib.cloud_api import ResumableUploadException |
| 55 from gslib.cloud_api_helper import GetDownloadSerializationDict |
| 56 from gslib.commands.compose import MAX_COMPOSE_ARITY |
| 57 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SI
ZE |
| 58 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD |
| 59 from gslib.cs_api_map import ApiSelector |
| 60 from gslib.daisy_chain_wrapper import DaisyChainWrapper |
| 61 from gslib.exception import CommandException |
| 62 from gslib.file_part import FilePart |
| 63 from gslib.hashing_helper import Base64EncodeHash |
| 64 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents |
| 65 from gslib.hashing_helper import CalculateHashesFromContents |
| 66 from gslib.hashing_helper import GetDownloadHashAlgs |
| 67 from gslib.hashing_helper import GetUploadHashAlgs |
| 68 from gslib.hashing_helper import HashingFileUploadWrapper |
| 69 from gslib.progress_callback import ConstructAnnounceText |
| 70 from gslib.progress_callback import FileProgressCallbackHandler |
| 71 from gslib.progress_callback import ProgressCallbackWithBackoff |
| 72 from gslib.storage_url import ContainsWildcard |
| 73 from gslib.storage_url import StorageUrlFromString |
| 74 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages |
| 75 from gslib.translation_helper import AddS3MarkerAclToObjectMetadata |
| 76 from gslib.translation_helper import CopyObjectMetadata |
| 77 from gslib.translation_helper import DEFAULT_CONTENT_TYPE |
| 78 from gslib.translation_helper import GenerationFromUrlAndString |
| 79 from gslib.translation_helper import ObjectMetadataFromHeaders |
| 80 from gslib.translation_helper import PreconditionsFromHeaders |
| 81 from gslib.translation_helper import S3MarkerAclFromObjectMetadata |
| 82 from gslib.util import CreateLock |
| 83 from gslib.util import CreateTrackerDirIfNeeded |
| 84 from gslib.util import DEFAULT_FILE_BUFFER_SIZE |
| 85 from gslib.util import GetCloudApiInstance |
| 86 from gslib.util import GetFileSize |
| 87 from gslib.util import GetStreamFromFileUrl |
| 88 from gslib.util import HumanReadableToBytes |
| 89 from gslib.util import IS_WINDOWS |
| 90 from gslib.util import IsCloudSubdirPlaceholder |
| 91 from gslib.util import MakeHumanReadable |
| 92 from gslib.util import MIN_SIZE_COMPUTE_LOGGING |
| 93 from gslib.util import ResumableThreshold |
| 94 from gslib.util import TEN_MB |
| 95 from gslib.util import UTF8 |
| 96 from gslib.wildcard_iterator import CreateWildcardIterator |
| 97 |
| 98 # pylint: disable=g-import-not-at-top |
| 99 if IS_WINDOWS: |
| 100 import msvcrt |
| 101 from ctypes import c_int |
| 102 from ctypes import c_uint64 |
| 103 from ctypes import c_char_p |
| 104 from ctypes import c_wchar_p |
| 105 from ctypes import windll |
| 106 from ctypes import POINTER |
| 107 from ctypes import WINFUNCTYPE |
| 108 from ctypes import WinError |
| 109 |
| 110 # Declare copy_helper_opts as a global because namedtuple isn't aware of |
| 111 # assigning to a class member (which breaks pickling done by multiprocessing). |
| 112 # For details see |
| 113 # http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instanc
e-correctly |
| 114 # Similarly can't pickle logger. |
| 115 # pylint: disable=global-at-module-level |
| 116 global global_copy_helper_opts, global_logger |
| 117 |
| 118 PARALLEL_UPLOAD_TEMP_NAMESPACE = ( |
| 119 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/') |
| 120 |
| 121 PARALLEL_UPLOAD_STATIC_SALT = u""" |
| 122 PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS. |
| 123 The theory is that no user will have prepended this to the front of |
| 124 one of their object names and then done an MD5 hash of the name, and |
| 125 then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object |
| 126 name. Note that there will be no problems with object name length since we |
| 127 hash the original name. |
| 128 """ |
| 129 |
| 130 TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT = ( |
| 131 'Couldn\'t write tracker file (%s): %s. This can happen if gsutil is ' |
| 132 'configured to save tracker files to an unwritable directory)') |
| 133 |
| 134 # When uploading a file, get the following fields in the response for |
| 135 # filling in command output and manifests. |
| 136 UPLOAD_RETURN_FIELDS = ['crc32c', 'generation', 'md5Hash', 'size'] |
| 137 |
| 138 # This tuple is used only to encapsulate the arguments needed for |
| 139 # command.Apply() in the parallel composite upload case. |
| 140 # Note that content_type is used instead of a full apitools Object() because |
| 141 # apitools objects are not picklable. |
| 142 # filename: String name of file. |
| 143 # file_start: start byte of file (may be in the middle of a file for partitioned |
| 144 # files). |
| 145 # file_length: length of upload (may not be the entire length of a file for |
| 146 # partitioned files). |
| 147 # src_url: FileUrl describing the source file. |
| 148 # dst_url: CloudUrl describing the destination component file. |
| 149 # canned_acl: canned_acl to apply to the uploaded file/component. |
| 150 # content_type: content-type for final object, used for setting content-type |
| 151 # of components and final object. |
| 152 # tracker_file: tracker file for this component. |
| 153 # tracker_file_lock: tracker file lock for tracker file(s). |
| 154 PerformParallelUploadFileToObjectArgs = namedtuple( |
| 155 'PerformParallelUploadFileToObjectArgs', |
| 156 'filename file_start file_length src_url dst_url canned_acl ' |
| 157 'content_type tracker_file tracker_file_lock') |
| 158 |
| 159 ObjectFromTracker = namedtuple('ObjectFromTracker', |
| 160 'object_name generation') |
| 161 |
| 162 # The maximum length of a file name can vary wildly between different |
| 163 # operating systems, so we always ensure that tracker files are less |
| 164 # than 100 characters in order to avoid any such issues. |
| 165 MAX_TRACKER_FILE_NAME_LENGTH = 100 |
| 166 |
| 167 # TODO: Refactor this file to be less cumbersome. In particular, some of the |
| 168 # different paths (e.g., uploading a file to an object vs. downloading an |
| 169 # object to a file) could be split into separate files. |
| 170 |
| 171 # Chunk size to use while zipping/unzipping gzip files. |
| 172 GZIP_CHUNK_SIZE = 8192 |
| 173 |
| 174 PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024 |
| 175 |
| 176 suggested_parallel_composites = False |
| 177 |
| 178 |
| 179 class TrackerFileType(object): |
| 180 UPLOAD = 'upload' |
| 181 DOWNLOAD = 'download' |
| 182 PARALLEL_UPLOAD = 'parallel_upload' |
| 183 |
| 184 |
| 185 def _RmExceptionHandler(cls, e): |
| 186 """Simple exception handler to allow post-completion status.""" |
| 187 cls.logger.error(str(e)) |
| 188 |
| 189 |
| 190 def _ParallelUploadCopyExceptionHandler(cls, e): |
| 191 """Simple exception handler to allow post-completion status.""" |
| 192 cls.logger.error(str(e)) |
| 193 cls.op_failure_count += 1 |
| 194 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n', |
| 195 traceback.format_exc()) |
| 196 |
| 197 |
| 198 def _PerformParallelUploadFileToObject(cls, args, thread_state=None): |
| 199 """Function argument to Apply for performing parallel composite uploads. |
| 200 |
| 201 Args: |
| 202 cls: Calling Command class. |
| 203 args: PerformParallelUploadFileToObjectArgs tuple describing the target. |
| 204 thread_state: gsutil Cloud API instance to use for the operation. |
| 205 |
| 206 Returns: |
| 207 StorageUrl representing a successfully uploaded component. |
| 208 """ |
| 209 fp = FilePart(args.filename, args.file_start, args.file_length) |
| 210 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) |
| 211 with fp: |
| 212 # We take many precautions with the component names that make collisions |
| 213 # effectively impossible. Specifying preconditions will just allow us to |
| 214 # reach a state in which uploads will always fail on retries. |
| 215 preconditions = None |
| 216 |
| 217 # Fill in content type if one was provided. |
| 218 dst_object_metadata = apitools_messages.Object( |
| 219 name=args.dst_url.object_name, |
| 220 bucket=args.dst_url.bucket_name, |
| 221 contentType=args.content_type) |
| 222 |
| 223 try: |
| 224 if global_copy_helper_opts.canned_acl: |
| 225 # No canned ACL support in JSON, force XML API to be used for |
| 226 # upload/copy operations. |
| 227 orig_prefer_api = gsutil_api.prefer_api |
| 228 gsutil_api.prefer_api = ApiSelector.XML |
| 229 ret = _UploadFileToObject(args.src_url, fp, args.file_length, |
| 230 args.dst_url, dst_object_metadata, |
| 231 preconditions, gsutil_api, cls.logger, cls, |
| 232 _ParallelUploadCopyExceptionHandler, |
| 233 gzip_exts=None, allow_splitting=False) |
| 234 finally: |
| 235 if global_copy_helper_opts.canned_acl: |
| 236 gsutil_api.prefer_api = orig_prefer_api |
| 237 |
| 238 component = ret[2] |
| 239 _AppendComponentTrackerToParallelUploadTrackerFile( |
| 240 args.tracker_file, component, args.tracker_file_lock) |
| 241 return ret |
| 242 |
| 243 |
| 244 CopyHelperOpts = namedtuple('CopyHelperOpts', [ |
| 245 'perform_mv', |
| 246 'no_clobber', |
| 247 'daisy_chain', |
| 248 'read_args_from_stdin', |
| 249 'print_ver', |
| 250 'use_manifest', |
| 251 'preserve_acl', |
| 252 'canned_acl', |
| 253 'halt_at_byte']) |
| 254 |
| 255 |
| 256 # pylint: disable=global-variable-undefined |
| 257 def CreateCopyHelperOpts(perform_mv=False, no_clobber=False, daisy_chain=False, |
| 258 read_args_from_stdin=False, print_ver=False, |
| 259 use_manifest=False, preserve_acl=False, |
| 260 canned_acl=None, halt_at_byte=None): |
| 261 """Creates CopyHelperOpts for passing options to CopyHelper.""" |
| 262 # We create a tuple with union of options needed by CopyHelper and any |
| 263 # copy-related functionality in CpCommand, RsyncCommand, or Command class. |
| 264 global global_copy_helper_opts |
| 265 global_copy_helper_opts = CopyHelperOpts( |
| 266 perform_mv=perform_mv, |
| 267 no_clobber=no_clobber, |
| 268 daisy_chain=daisy_chain, |
| 269 read_args_from_stdin=read_args_from_stdin, |
| 270 print_ver=print_ver, |
| 271 use_manifest=use_manifest, |
| 272 preserve_acl=preserve_acl, |
| 273 canned_acl=canned_acl, |
| 274 halt_at_byte=halt_at_byte) |
| 275 return global_copy_helper_opts |
| 276 |
| 277 |
| 278 # pylint: disable=global-variable-undefined |
| 279 # pylint: disable=global-variable-not-assigned |
| 280 def GetCopyHelperOpts(): |
| 281 """Returns namedtuple holding CopyHelper options.""" |
| 282 global global_copy_helper_opts |
| 283 return global_copy_helper_opts |
| 284 |
| 285 |
| 286 def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None): |
| 287 """Gets the tracker file name described by the arguments. |
| 288 |
| 289 Public for testing purposes. |
| 290 |
| 291 Args: |
| 292 dst_url: Destination URL for tracker file. |
| 293 tracker_file_type: TrackerFileType for this operation. |
| 294 api_selector: API to use for this operation. |
| 295 src_url: Source URL for the source file name for parallel uploads. |
| 296 |
| 297 Returns: |
| 298 File path to tracker file. |
| 299 """ |
| 300 resumable_tracker_dir = CreateTrackerDirIfNeeded() |
| 301 if tracker_file_type == TrackerFileType.UPLOAD: |
| 302 # Encode the dest bucket and object name into the tracker file name. |
| 303 res_tracker_file_name = ( |
| 304 re.sub('[/\\\\]', '_', 'resumable_upload__%s__%s__%s.url' % |
| 305 (dst_url.bucket_name, dst_url.object_name, api_selector))) |
| 306 elif tracker_file_type == TrackerFileType.DOWNLOAD: |
| 307 # Encode the fully-qualified dest file name into the tracker file name. |
| 308 res_tracker_file_name = ( |
| 309 re.sub('[/\\\\]', '_', 'resumable_download__%s__%s.etag' % |
| 310 (os.path.realpath(dst_url.object_name), api_selector))) |
| 311 elif tracker_file_type == TrackerFileType.PARALLEL_UPLOAD: |
| 312 # Encode the dest bucket and object names as well as the source file name |
| 313 # into the tracker file name. |
| 314 res_tracker_file_name = ( |
| 315 re.sub('[/\\\\]', '_', 'parallel_upload__%s__%s__%s__%s.url' % |
| 316 (dst_url.bucket_name, dst_url.object_name, |
| 317 src_url, api_selector))) |
| 318 |
| 319 res_tracker_file_name = _HashFilename(res_tracker_file_name) |
| 320 tracker_file_name = '%s_%s' % (str(tracker_file_type).lower(), |
| 321 res_tracker_file_name) |
| 322 tracker_file_path = '%s%s%s' % (resumable_tracker_dir, os.sep, |
| 323 tracker_file_name) |
| 324 assert len(tracker_file_name) < MAX_TRACKER_FILE_NAME_LENGTH |
| 325 return tracker_file_path |
| 326 |
| 327 |
| 328 def _SelectDownloadStrategy(src_obj_metadata, dst_url): |
| 329 """Get download strategy based on the source and dest objects. |
| 330 |
| 331 Args: |
| 332 src_obj_metadata: Object describing the source object. |
| 333 dst_url: Destination StorageUrl. |
| 334 |
| 335 Returns: |
| 336 gsutil Cloud API DownloadStrategy. |
| 337 """ |
| 338 dst_is_special = False |
| 339 if dst_url.IsFileUrl(): |
| 340 # Check explicitly first because os.stat doesn't work on 'nul' in Windows. |
| 341 if dst_url.object_name == os.devnull: |
| 342 dst_is_special = True |
| 343 try: |
| 344 mode = os.stat(dst_url.object_name).st_mode |
| 345 if stat.S_ISCHR(mode): |
| 346 dst_is_special = True |
| 347 except OSError: |
| 348 pass |
| 349 |
| 350 if src_obj_metadata.size >= ResumableThreshold() and not dst_is_special: |
| 351 return CloudApi.DownloadStrategy.RESUMABLE |
| 352 else: |
| 353 return CloudApi.DownloadStrategy.ONE_SHOT |
| 354 |
| 355 |
| 356 def _GetUploadTrackerData(tracker_file_name, logger): |
| 357 """Checks for an upload tracker file and creates one if it does not exist. |
| 358 |
| 359 Args: |
| 360 tracker_file_name: Tracker file name for this upload. |
| 361 logger: for outputting log messages. |
| 362 |
| 363 Returns: |
| 364 Serialization data if the tracker file already exists (resume existing |
| 365 upload), None otherwise. |
| 366 """ |
| 367 tracker_file = None |
| 368 |
| 369 # If we already have a matching tracker file, get the serialization data |
| 370 # so that we can resume the upload. |
| 371 try: |
| 372 tracker_file = open(tracker_file_name, 'r') |
| 373 tracker_data = tracker_file.read() |
| 374 return tracker_data |
| 375 except IOError as e: |
| 376 # Ignore non-existent file (happens first time a upload |
| 377 # is attempted on an object), but warn user for other errors. |
| 378 if e.errno != errno.ENOENT: |
| 379 logger.warn('Couldn\'t read upload tracker file (%s): %s. Restarting ' |
| 380 'upload from scratch.', tracker_file_name, e.strerror) |
| 381 finally: |
| 382 if tracker_file: |
| 383 tracker_file.close() |
| 384 |
| 385 |
| 386 def _ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, |
| 387 api_selector): |
| 388 """Checks for a download tracker file and creates one if it does not exist. |
| 389 |
| 390 Args: |
| 391 src_obj_metadata: Metadata for the source object. Must include |
| 392 etag. |
| 393 dst_url: Destination file StorageUrl. |
| 394 api_selector: API mode to use (for tracker file naming). |
| 395 |
| 396 Returns: |
| 397 True if the tracker file already exists (resume existing download), |
| 398 False if we created a new tracker file (new download). |
| 399 """ |
| 400 assert src_obj_metadata.etag |
| 401 tracker_file_name = GetTrackerFilePath( |
| 402 dst_url, TrackerFileType.DOWNLOAD, api_selector) |
| 403 tracker_file = None |
| 404 |
| 405 # Check to see if we already have a matching tracker file. |
| 406 try: |
| 407 tracker_file = open(tracker_file_name, 'r') |
| 408 etag_value = tracker_file.readline().rstrip('\n') |
| 409 if etag_value == src_obj_metadata.etag: |
| 410 return True |
| 411 except IOError as e: |
| 412 # Ignore non-existent file (happens first time a download |
| 413 # is attempted on an object), but warn user for other errors. |
| 414 if e.errno != errno.ENOENT: |
| 415 print('Couldn\'t read URL tracker file (%s): %s. Restarting ' |
| 416 'download from scratch.' % |
| 417 (tracker_file_name, e.strerror)) |
| 418 finally: |
| 419 if tracker_file: |
| 420 tracker_file.close() |
| 421 |
| 422 # Otherwise, create a new tracker file and start from scratch. |
| 423 try: |
| 424 with os.fdopen(os.open(tracker_file_name, |
| 425 os.O_WRONLY | os.O_CREAT, 0600), 'w') as tf: |
| 426 tf.write('%s\n' % src_obj_metadata.etag) |
| 427 return False |
| 428 except IOError as e: |
| 429 raise CommandException(TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT % |
| 430 (tracker_file_name, e.strerror)) |
| 431 finally: |
| 432 if tracker_file: |
| 433 tracker_file.close() |
| 434 |
| 435 |
| 436 def _DeleteTrackerFile(tracker_file_name): |
| 437 if tracker_file_name and os.path.exists(tracker_file_name): |
| 438 os.unlink(tracker_file_name) |
| 439 |
| 440 |
| 441 def InsistDstUrlNamesContainer(exp_dst_url, have_existing_dst_container, |
| 442 command_name): |
| 443 """Ensures the destination URL names a container. |
| 444 |
| 445 Acceptable containers include directory, bucket, bucket |
| 446 subdir, and non-existent bucket subdir. |
| 447 |
| 448 Args: |
| 449 exp_dst_url: Wildcard-expanded destination StorageUrl. |
| 450 have_existing_dst_container: bool indicator of whether exp_dst_url |
| 451 names a container (directory, bucket, or existing bucket subdir). |
| 452 command_name: Name of command making call. May not be the same as the |
| 453 calling class's self.command_name in the case of commands implemented |
| 454 atop other commands (like mv command). |
| 455 |
| 456 Raises: |
| 457 CommandException: if the URL being checked does not name a container. |
| 458 """ |
| 459 if ((exp_dst_url.IsFileUrl() and not exp_dst_url.IsDirectory()) or |
| 460 (exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket() |
| 461 and not have_existing_dst_container)): |
| 462 raise CommandException('Destination URL must name a directory, bucket, ' |
| 463 'or bucket\nsubdirectory for the multiple ' |
| 464 'source form of the %s command.' % command_name) |
| 465 |
| 466 |
| 467 def _ShouldTreatDstUrlAsBucketSubDir(have_multiple_srcs, dst_url, |
| 468 have_existing_dest_subdir, |
| 469 src_url_names_container, |
| 470 recursion_requested): |
| 471 """Checks whether dst_url should be treated as a bucket "sub-directory". |
| 472 |
| 473 The decision about whether something constitutes a bucket "sub-directory" |
| 474 depends on whether there are multiple sources in this request and whether |
| 475 there is an existing bucket subdirectory. For example, when running the |
| 476 command: |
| 477 gsutil cp file gs://bucket/abc |
| 478 if there's no existing gs://bucket/abc bucket subdirectory we should copy |
| 479 file to the object gs://bucket/abc. In contrast, if |
| 480 there's an existing gs://bucket/abc bucket subdirectory we should copy |
| 481 file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc |
| 482 exists, when running the command: |
| 483 gsutil cp file1 file2 gs://bucket/abc |
| 484 we should copy file1 to gs://bucket/abc/file1 (and similarly for file2). |
| 485 Finally, for recursive copies, if the source is a container then we should |
| 486 copy to a container as the target. For example, when running the command: |
| 487 gsutil cp -r dir1 gs://bucket/dir2 |
| 488 we should copy the subtree of dir1 to gs://bucket/dir2. |
| 489 |
| 490 Note that we don't disallow naming a bucket "sub-directory" where there's |
| 491 already an object at that URL. For example it's legitimate (albeit |
| 492 confusing) to have an object called gs://bucket/dir and |
| 493 then run the command |
| 494 gsutil cp file1 file2 gs://bucket/dir |
| 495 Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1, |
| 496 and gs://bucket/dir/file2. |
| 497 |
| 498 Args: |
| 499 have_multiple_srcs: Bool indicator of whether this is a multi-source |
| 500 operation. |
| 501 dst_url: StorageUrl to check. |
| 502 have_existing_dest_subdir: bool indicator whether dest is an existing |
| 503 subdirectory. |
| 504 src_url_names_container: bool indicator of whether the source URL |
| 505 is a container. |
| 506 recursion_requested: True if a recursive operation has been requested. |
| 507 |
| 508 Returns: |
| 509 bool indicator. |
| 510 """ |
| 511 if have_existing_dest_subdir: |
| 512 return True |
| 513 if dst_url.IsCloudUrl(): |
| 514 return (have_multiple_srcs or |
| 515 (src_url_names_container and recursion_requested)) |
| 516 |
| 517 |
| 518 def _ShouldTreatDstUrlAsSingleton(have_multiple_srcs, |
| 519 have_existing_dest_subdir, dst_url, |
| 520 recursion_requested): |
| 521 """Checks that dst_url names a single file/object after wildcard expansion. |
| 522 |
| 523 It is possible that an object path might name a bucket sub-directory. |
| 524 |
| 525 Args: |
| 526 have_multiple_srcs: Bool indicator of whether this is a multi-source |
| 527 operation. |
| 528 have_existing_dest_subdir: bool indicator whether dest is an existing |
| 529 subdirectory. |
| 530 dst_url: StorageUrl to check. |
| 531 recursion_requested: True if a recursive operation has been requested. |
| 532 |
| 533 Returns: |
| 534 bool indicator. |
| 535 """ |
| 536 if recursion_requested: |
| 537 return False |
| 538 if dst_url.IsFileUrl(): |
| 539 return not dst_url.IsDirectory() |
| 540 else: # dst_url.IsCloudUrl() |
| 541 return (not have_multiple_srcs and |
| 542 not have_existing_dest_subdir and |
| 543 dst_url.IsObject()) |
| 544 |
| 545 |
| 546 def ConstructDstUrl(src_url, exp_src_url, src_url_names_container, |
| 547 have_multiple_srcs, exp_dst_url, have_existing_dest_subdir, |
| 548 recursion_requested): |
| 549 """Constructs the destination URL for a given exp_src_url/exp_dst_url pair. |
| 550 |
| 551 Uses context-dependent naming rules that mimic Linux cp and mv behavior. |
| 552 |
| 553 Args: |
| 554 src_url: Source StorageUrl to be copied. |
| 555 exp_src_url: Single StorageUrl from wildcard expansion of src_url. |
| 556 src_url_names_container: True if src_url names a container (including the |
| 557 case of a wildcard-named bucket subdir (like gs://bucket/abc, |
| 558 where gs://bucket/abc/* matched some objects). |
| 559 have_multiple_srcs: True if this is a multi-source request. This can be |
| 560 true if src_url wildcard-expanded to multiple URLs or if there were |
| 561 multiple source URLs in the request. |
| 562 exp_dst_url: the expanded StorageUrl requested for the cp destination. |
| 563 Final written path is constructed from this plus a context-dependent |
| 564 variant of src_url. |
| 565 have_existing_dest_subdir: bool indicator whether dest is an existing |
| 566 subdirectory. |
| 567 recursion_requested: True if a recursive operation has been requested. |
| 568 |
| 569 Returns: |
| 570 StorageUrl to use for copy. |
| 571 |
| 572 Raises: |
| 573 CommandException if destination object name not specified for |
| 574 source and source is a stream. |
| 575 """ |
| 576 if _ShouldTreatDstUrlAsSingleton( |
| 577 have_multiple_srcs, have_existing_dest_subdir, exp_dst_url, |
| 578 recursion_requested): |
| 579 # We're copying one file or object to one file or object. |
| 580 return exp_dst_url |
| 581 |
| 582 if exp_src_url.IsFileUrl() and exp_src_url.IsStream(): |
| 583 if have_existing_dest_subdir: |
| 584 raise CommandException('Destination object name needed when ' |
| 585 'source is a stream') |
| 586 return exp_dst_url |
| 587 |
| 588 if not recursion_requested and not have_multiple_srcs: |
| 589 # We're copying one file or object to a subdirectory. Append final comp |
| 590 # of exp_src_url to exp_dst_url. |
| 591 src_final_comp = exp_src_url.object_name.rpartition(src_url.delim)[-1] |
| 592 return StorageUrlFromString('%s%s%s' % ( |
| 593 exp_dst_url.url_string.rstrip(exp_dst_url.delim), |
| 594 exp_dst_url.delim, src_final_comp)) |
| 595 |
| 596 # Else we're copying multiple sources to a directory, bucket, or a bucket |
| 597 # "sub-directory". |
| 598 |
| 599 # Ensure exp_dst_url ends in delim char if we're doing a multi-src copy or |
| 600 # a copy to a directory. (The check for copying to a directory needs |
| 601 # special-case handling so that the command: |
| 602 # gsutil cp gs://bucket/obj dir |
| 603 # will turn into file://dir/ instead of file://dir -- the latter would cause |
| 604 # the file "dirobj" to be created.) |
| 605 # Note: need to check have_multiple_srcs or src_url.names_container() |
| 606 # because src_url could be a bucket containing a single object, named |
| 607 # as gs://bucket. |
| 608 if ((have_multiple_srcs or src_url_names_container or |
| 609 (exp_dst_url.IsFileUrl() and exp_dst_url.IsDirectory())) |
| 610 and not exp_dst_url.url_string.endswith(exp_dst_url.delim)): |
| 611 exp_dst_url = StorageUrlFromString('%s%s' % (exp_dst_url.url_string, |
| 612 exp_dst_url.delim)) |
| 613 |
| 614 # Making naming behavior match how things work with local Linux cp and mv |
| 615 # operations depends on many factors, including whether the destination is a |
| 616 # container, the plurality of the source(s), and whether the mv command is |
| 617 # being used: |
| 618 # 1. For the "mv" command that specifies a non-existent destination subdir, |
| 619 # renaming should occur at the level of the src subdir, vs appending that |
| 620 # subdir beneath the dst subdir like is done for copying. For example: |
| 621 # gsutil rm -R gs://bucket |
| 622 # gsutil cp -R dir1 gs://bucket |
| 623 # gsutil cp -R dir2 gs://bucket/subdir1 |
| 624 # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2 |
| 625 # would (if using cp naming behavior) end up with paths like: |
| 626 # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops |
| 627 # whereas mv naming behavior should result in: |
| 628 # gs://bucket/subdir2/dir2/.svn/all-wcprops |
| 629 # 2. Copying from directories, buckets, or bucket subdirs should result in |
| 630 # objects/files mirroring the source directory hierarchy. For example: |
| 631 # gsutil cp dir1/dir2 gs://bucket |
| 632 # should create the object gs://bucket/dir2/file2, assuming dir1/dir2 |
| 633 # contains file2). |
| 634 # To be consistent with Linux cp behavior, there's one more wrinkle when |
| 635 # working with subdirs: The resulting object names depend on whether the |
| 636 # destination subdirectory exists. For example, if gs://bucket/subdir |
| 637 # exists, the command: |
| 638 # gsutil cp -R dir1/dir2 gs://bucket/subdir |
| 639 # should create objects named like gs://bucket/subdir/dir2/a/b/c. In |
| 640 # contrast, if gs://bucket/subdir does not exist, this same command |
| 641 # should create objects named like gs://bucket/subdir/a/b/c. |
| 642 # 3. Copying individual files or objects to dirs, buckets or bucket subdirs |
| 643 # should result in objects/files named by the final source file name |
| 644 # component. Example: |
| 645 # gsutil cp dir1/*.txt gs://bucket |
| 646 # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt, |
| 647 # assuming dir1 contains f1.txt and f2.txt. |
| 648 |
| 649 recursive_move_to_new_subdir = False |
| 650 if (global_copy_helper_opts.perform_mv and recursion_requested |
| 651 and src_url_names_container and not have_existing_dest_subdir): |
| 652 # Case 1. Handle naming rules for bucket subdir mv. Here we want to |
| 653 # line up the src_url against its expansion, to find the base to build |
| 654 # the new name. For example, running the command: |
| 655 # gsutil mv gs://bucket/abcd gs://bucket/xyz |
| 656 # when processing exp_src_url=gs://bucket/abcd/123 |
| 657 # exp_src_url_tail should become /123 |
| 658 # Note: mv.py code disallows wildcard specification of source URL. |
| 659 recursive_move_to_new_subdir = True |
| 660 exp_src_url_tail = ( |
| 661 exp_src_url.url_string[len(src_url.url_string):]) |
| 662 dst_key_name = '%s/%s' % (exp_dst_url.object_name.rstrip('/'), |
| 663 exp_src_url_tail.strip('/')) |
| 664 |
| 665 elif src_url_names_container and (exp_dst_url.IsCloudUrl() or |
| 666 exp_dst_url.IsDirectory()): |
| 667 # Case 2. Container copy to a destination other than a file. |
| 668 # Build dst_key_name from subpath of exp_src_url past |
| 669 # where src_url ends. For example, for src_url=gs://bucket/ and |
| 670 # exp_src_url=gs://bucket/src_subdir/obj, dst_key_name should be |
| 671 # src_subdir/obj. |
| 672 src_url_path_sans_final_dir = GetPathBeforeFinalDir(src_url) |
| 673 dst_key_name = exp_src_url.versionless_url_string[ |
| 674 len(src_url_path_sans_final_dir):].lstrip(src_url.delim) |
| 675 # Handle case where dst_url is a non-existent subdir. |
| 676 if not have_existing_dest_subdir: |
| 677 dst_key_name = dst_key_name.partition(src_url.delim)[-1] |
| 678 # Handle special case where src_url was a directory named with '.' or |
| 679 # './', so that running a command like: |
| 680 # gsutil cp -r . gs://dest |
| 681 # will produce obj names of the form gs://dest/abc instead of |
| 682 # gs://dest/./abc. |
| 683 if dst_key_name.startswith('.%s' % os.sep): |
| 684 dst_key_name = dst_key_name[2:] |
| 685 |
| 686 else: |
| 687 # Case 3. |
| 688 dst_key_name = exp_src_url.object_name.rpartition(src_url.delim)[-1] |
| 689 |
| 690 if (not recursive_move_to_new_subdir and ( |
| 691 exp_dst_url.IsFileUrl() or _ShouldTreatDstUrlAsBucketSubDir( |
| 692 have_multiple_srcs, exp_dst_url, have_existing_dest_subdir, |
| 693 src_url_names_container, recursion_requested))): |
| 694 if exp_dst_url.object_name and exp_dst_url.object_name.endswith( |
| 695 exp_dst_url.delim): |
| 696 dst_key_name = '%s%s%s' % ( |
| 697 exp_dst_url.object_name.rstrip(exp_dst_url.delim), |
| 698 exp_dst_url.delim, dst_key_name) |
| 699 else: |
| 700 delim = exp_dst_url.delim if exp_dst_url.object_name else '' |
| 701 dst_key_name = '%s%s%s' % (exp_dst_url.object_name or '', |
| 702 delim, dst_key_name) |
| 703 |
| 704 new_exp_dst_url = exp_dst_url.Clone() |
| 705 new_exp_dst_url.object_name = dst_key_name.replace(src_url.delim, |
| 706 exp_dst_url.delim) |
| 707 return new_exp_dst_url |
| 708 |
| 709 |
| 710 def _CreateDigestsFromDigesters(digesters): |
| 711 digests = {} |
| 712 if digesters: |
| 713 for alg in digesters: |
| 714 digests[alg] = base64.encodestring( |
| 715 digesters[alg].digest()).rstrip('\n') |
| 716 return digests |
| 717 |
| 718 |
| 719 def _CreateDigestsFromLocalFile(logger, algs, file_name, src_obj_metadata): |
| 720 """Creates a base64 CRC32C and/or MD5 digest from file_name. |
| 721 |
| 722 Args: |
| 723 logger: for outputting log messages. |
| 724 algs: list of algorithms to compute. |
| 725 file_name: file to digest. |
| 726 src_obj_metadata: metadta of source object. |
| 727 |
| 728 Returns: |
| 729 Dict of algorithm name : base 64 encoded digest |
| 730 """ |
| 731 hash_dict = {} |
| 732 if 'md5' in algs: |
| 733 if src_obj_metadata.size and src_obj_metadata.size > TEN_MB: |
| 734 logger.info( |
| 735 'Computing MD5 for %s...', file_name) |
| 736 hash_dict['md5'] = md5() |
| 737 if 'crc32c' in algs: |
| 738 hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c') |
| 739 with open(file_name, 'rb') as fp: |
| 740 CalculateHashesFromContents( |
| 741 fp, hash_dict, ProgressCallbackWithBackoff( |
| 742 src_obj_metadata.size, |
| 743 FileProgressCallbackHandler( |
| 744 ConstructAnnounceText('Hashing', file_name), logger).call)) |
| 745 digests = {} |
| 746 for alg_name, digest in hash_dict.iteritems(): |
| 747 digests[alg_name] = Base64EncodeHash(digest.hexdigest()) |
| 748 return digests |
| 749 |
| 750 |
| 751 def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, |
| 752 dst_obj_metadata): |
| 753 """Validates integrity of two cloud objects copied via daisy-chain. |
| 754 |
| 755 Args: |
| 756 logger: for outputting log messages. |
| 757 src_url: CloudUrl for source cloud object. |
| 758 dst_url: CloudUrl for destination cloud object. |
| 759 src_obj_metadata: Cloud Object metadata for object being downloaded from. |
| 760 dst_obj_metadata: Cloud Object metadata for object being uploaded to. |
| 761 |
| 762 Raises: |
| 763 CommandException: if cloud digests don't match local digests. |
| 764 """ |
| 765 checked_one = False |
| 766 download_hashes = {} |
| 767 upload_hashes = {} |
| 768 if src_obj_metadata.md5Hash: |
| 769 download_hashes['md5'] = src_obj_metadata.md5Hash |
| 770 if src_obj_metadata.crc32c: |
| 771 download_hashes['crc32c'] = src_obj_metadata.crc32c |
| 772 if dst_obj_metadata.md5Hash: |
| 773 upload_hashes['md5'] = dst_obj_metadata.md5Hash |
| 774 if dst_obj_metadata.crc32c: |
| 775 upload_hashes['crc32c'] = dst_obj_metadata.crc32c |
| 776 |
| 777 for alg, upload_b64_digest in upload_hashes.iteritems(): |
| 778 if alg not in download_hashes: |
| 779 continue |
| 780 |
| 781 download_b64_digest = download_hashes[alg] |
| 782 logger.debug( |
| 783 'Comparing source vs destination %s-checksum for %s. (%s/%s)', alg, |
| 784 dst_url, download_b64_digest, upload_b64_digest) |
| 785 if download_b64_digest != upload_b64_digest: |
| 786 raise CommandException( |
| 787 '%s signature for source object (%s) doesn\'t match ' |
| 788 'destination object digest (%s). Object (%s) will be deleted.' % ( |
| 789 alg, download_b64_digest, upload_b64_digest, dst_url)) |
| 790 checked_one = True |
| 791 if not checked_one: |
| 792 # One known way this can currently happen is when downloading objects larger |
| 793 # than 5GB from S3 (for which the etag is not an MD5). |
| 794 logger.warn( |
| 795 'WARNING: Found no hashes to validate object downloaded from %s and ' |
| 796 'uploaded to %s. Integrity cannot be assured without hashes.', |
| 797 src_url, dst_url) |
| 798 |
| 799 |
| 800 def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests, |
| 801 is_upload=False): |
| 802 """Validates integrity by comparing cloud digest to local digest. |
| 803 |
| 804 Args: |
| 805 logger: for outputting log messages. |
| 806 obj_url: CloudUrl for cloud object. |
| 807 obj_metadata: Cloud Object being downloaded from or uploaded to. |
| 808 file_name: Local file name on disk being downloaded to or uploaded from. |
| 809 digests: Computed Digests for the object. |
| 810 is_upload: If true, comparing for an uploaded object (controls logging). |
| 811 |
| 812 Raises: |
| 813 CommandException: if cloud digests don't match local digests. |
| 814 """ |
| 815 local_hashes = digests |
| 816 cloud_hashes = {} |
| 817 if obj_metadata.md5Hash: |
| 818 cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n') |
| 819 if obj_metadata.crc32c: |
| 820 cloud_hashes['crc32c'] = obj_metadata.crc32c.rstrip('\n') |
| 821 |
| 822 checked_one = False |
| 823 for alg in local_hashes: |
| 824 if alg not in cloud_hashes: |
| 825 continue |
| 826 |
| 827 local_b64_digest = local_hashes[alg] |
| 828 cloud_b64_digest = cloud_hashes[alg] |
| 829 logger.debug( |
| 830 'Comparing local vs cloud %s-checksum for %s. (%s/%s)', alg, file_name, |
| 831 local_b64_digest, cloud_b64_digest) |
| 832 if local_b64_digest != cloud_b64_digest: |
| 833 |
| 834 raise CommandException( |
| 835 '%s signature computed for local file (%s) doesn\'t match ' |
| 836 'cloud-supplied digest (%s). %s (%s) will be deleted.' % ( |
| 837 alg, local_b64_digest, cloud_b64_digest, |
| 838 'Cloud object' if is_upload else 'Local file', |
| 839 obj_url if is_upload else file_name)) |
| 840 checked_one = True |
| 841 if not checked_one: |
| 842 if is_upload: |
| 843 logger.warn( |
| 844 'WARNING: Found no hashes to validate object uploaded to %s. ' |
| 845 'Integrity cannot be assured without hashes.', obj_url) |
| 846 else: |
| 847 # One known way this can currently happen is when downloading objects larger |
| 848 # than 5GB from S3 (for which the etag is not an MD5). |
| 849 logger.warn( |
| 850 'WARNING: Found no hashes to validate object downloaded to %s. ' |
| 851 'Integrity cannot be assured without hashes.', file_name) |
| 852 |
| 853 |
| 854 def IsNoClobberServerException(e): |
| 855 """Checks to see if the server attempted to clobber a file. |
| 856 |
| 857 In this case we specified via a precondition that we didn't want the file |
| 858 clobbered. |
| 859 |
| 860 Args: |
| 861 e: The Exception that was generated by a failed copy operation |
| 862 |
| 863 Returns: |
| 864 bool indicator - True indicates that the server did attempt to clobber |
| 865 an existing file. |
| 866 """ |
| 867 return ((isinstance(e, PreconditionException)) or |
| 868 (isinstance(e, ResumableUploadException) and '412' in e.message)) |
| 869 |
| 870 |
| 871 def CheckForDirFileConflict(exp_src_url, dst_url): |
| 872 """Checks whether copying exp_src_url into dst_url is not possible. |
| 873 |
| 874 This happens if a directory exists in local file system where a file |
| 875 needs to go or vice versa. In that case we print an error message and |
| 876 exits. Example: if the file "./x" exists and you try to do: |
| 877 gsutil cp gs://mybucket/x/y . |
| 878 the request can't succeed because it requires a directory where |
| 879 the file x exists. |
| 880 |
| 881 Note that we don't enforce any corresponding restrictions for buckets, |
| 882 because the flat namespace semantics for buckets doesn't prohibit such |
| 883 cases the way hierarchical file systems do. For example, if a bucket |
| 884 contains an object called gs://bucket/dir and then you run the command: |
| 885 gsutil cp file1 file2 gs://bucket/dir |
| 886 you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and |
| 887 gs://bucket/dir/file2. |
| 888 |
| 889 Args: |
| 890 exp_src_url: Expanded source StorageUrl. |
| 891 dst_url: Destination StorageUrl. |
| 892 |
| 893 Raises: |
| 894 CommandException: if errors encountered. |
| 895 """ |
| 896 if dst_url.IsCloudUrl(): |
| 897 # The problem can only happen for file destination URLs. |
| 898 return |
| 899 dst_path = dst_url.object_name |
| 900 final_dir = os.path.dirname(dst_path) |
| 901 if os.path.isfile(final_dir): |
| 902 raise CommandException('Cannot retrieve %s because a file exists ' |
| 903 'where a directory needs to be created (%s).' % |
| 904 (exp_src_url.url_string, final_dir)) |
| 905 if os.path.isdir(dst_path): |
| 906 raise CommandException('Cannot retrieve %s because a directory exists ' |
| 907 '(%s) where the file needs to be created.' % |
| 908 (exp_src_url.url_string, dst_path)) |
| 909 |
| 910 |
| 911 def _PartitionFile(fp, file_size, src_url, content_type, canned_acl, |
| 912 dst_bucket_url, random_prefix, tracker_file, |
| 913 tracker_file_lock): |
| 914 """Partitions a file into FilePart objects to be uploaded and later composed. |
| 915 |
| 916 These objects, when composed, will match the original file. This entails |
| 917 splitting the file into parts, naming and forming a destination URL for each |
| 918 part, and also providing the PerformParallelUploadFileToObjectArgs |
| 919 corresponding to each part. |
| 920 |
| 921 Args: |
| 922 fp: The file object to be partitioned. |
| 923 file_size: The size of fp, in bytes. |
| 924 src_url: Source FileUrl from the original command. |
| 925 content_type: content type for the component and final objects. |
| 926 canned_acl: The user-provided canned_acl, if applicable. |
| 927 dst_bucket_url: CloudUrl for the destination bucket |
| 928 random_prefix: The randomly-generated prefix used to prevent collisions |
| 929 among the temporary component names. |
| 930 tracker_file: The path to the parallel composite upload tracker file. |
| 931 tracker_file_lock: The lock protecting access to the tracker file. |
| 932 |
| 933 Returns: |
| 934 dst_args: The destination URIs for the temporary component objects. |
| 935 """ |
| 936 parallel_composite_upload_component_size = HumanReadableToBytes( |
| 937 config.get('GSUtil', 'parallel_composite_upload_component_size', |
| 938 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE)) |
| 939 (num_components, component_size) = _GetPartitionInfo( |
| 940 file_size, MAX_COMPOSE_ARITY, parallel_composite_upload_component_size) |
| 941 |
| 942 dst_args = {} # Arguments to create commands and pass to subprocesses. |
| 943 file_names = [] # Used for the 2-step process of forming dst_args. |
| 944 for i in range(num_components): |
| 945 # "Salt" the object name with something a user is very unlikely to have |
| 946 # used in an object name, then hash the extended name to make sure |
| 947 # we don't run into problems with name length. Using a deterministic |
| 948 # naming scheme for the temporary components allows users to take |
| 949 # advantage of resumable uploads for each component. |
| 950 encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode(UTF8) |
| 951 content_md5 = md5() |
| 952 content_md5.update(encoded_name) |
| 953 digest = content_md5.hexdigest() |
| 954 temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE + |
| 955 digest + '_' + str(i)) |
| 956 tmp_dst_url = dst_bucket_url.Clone() |
| 957 tmp_dst_url.object_name = temp_file_name |
| 958 |
| 959 if i < (num_components - 1): |
| 960 # Every component except possibly the last is the same size. |
| 961 file_part_length = component_size |
| 962 else: |
| 963 # The last component just gets all of the remaining bytes. |
| 964 file_part_length = (file_size - ((num_components -1) * component_size)) |
| 965 offset = i * component_size |
| 966 func_args = PerformParallelUploadFileToObjectArgs( |
| 967 fp.name, offset, file_part_length, src_url, tmp_dst_url, canned_acl, |
| 968 content_type, tracker_file, tracker_file_lock) |
| 969 file_names.append(temp_file_name) |
| 970 dst_args[temp_file_name] = func_args |
| 971 |
| 972 return dst_args |
| 973 |
| 974 |
| 975 def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata, |
| 976 canned_acl, file_size, preconditions, gsutil_api, |
| 977 command_obj, copy_exception_handler): |
| 978 """Uploads a local file to a cloud object using parallel composite upload. |
| 979 |
| 980 The file is partitioned into parts, and then the parts are uploaded in |
| 981 parallel, composed to form the original destination object, and deleted. |
| 982 |
| 983 Args: |
| 984 fp: The file object to be uploaded. |
| 985 src_url: FileUrl representing the local file. |
| 986 dst_url: CloudUrl representing the destination file. |
| 987 dst_obj_metadata: apitools Object describing the destination object. |
| 988 canned_acl: The canned acl to apply to the object, if any. |
| 989 file_size: The size of the source file in bytes. |
| 990 preconditions: Cloud API Preconditions for the final object. |
| 991 gsutil_api: gsutil Cloud API instance to use. |
| 992 command_obj: Command object (for calling Apply). |
| 993 copy_exception_handler: Copy exception handler (for use in Apply). |
| 994 |
| 995 Returns: |
| 996 Elapsed upload time, uploaded Object with generation, crc32c, and size |
| 997 fields populated. |
| 998 """ |
| 999 start_time = time.time() |
| 1000 dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string) |
| 1001 api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme) |
| 1002 # Determine which components, if any, have already been successfully |
| 1003 # uploaded. |
| 1004 tracker_file = GetTrackerFilePath(dst_url, TrackerFileType.PARALLEL_UPLOAD, |
| 1005 api_selector, src_url) |
| 1006 tracker_file_lock = CreateLock() |
| 1007 (random_prefix, existing_components) = ( |
| 1008 _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock)) |
| 1009 |
| 1010 # Create the initial tracker file for the upload. |
| 1011 _CreateParallelUploadTrackerFile(tracker_file, random_prefix, |
| 1012 existing_components, tracker_file_lock) |
| 1013 |
| 1014 # Get the set of all components that should be uploaded. |
| 1015 dst_args = _PartitionFile( |
| 1016 fp, file_size, src_url, dst_obj_metadata.contentType, canned_acl, |
| 1017 dst_bucket_url, random_prefix, tracker_file, tracker_file_lock) |
| 1018 |
| 1019 (components_to_upload, existing_components, existing_objects_to_delete) = ( |
| 1020 FilterExistingComponents(dst_args, existing_components, dst_bucket_url, |
| 1021 gsutil_api)) |
| 1022 |
| 1023 # In parallel, copy all of the file parts that haven't already been |
| 1024 # uploaded to temporary objects. |
| 1025 cp_results = command_obj.Apply( |
| 1026 _PerformParallelUploadFileToObject, components_to_upload, |
| 1027 copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'), |
| 1028 arg_checker=gslib.command.DummyArgChecker, |
| 1029 parallel_operations_override=True, should_return_results=True) |
| 1030 uploaded_components = [] |
| 1031 for cp_result in cp_results: |
| 1032 uploaded_components.append(cp_result[2]) |
| 1033 components = uploaded_components + existing_components |
| 1034 |
| 1035 if len(components) == len(dst_args): |
| 1036 # Only try to compose if all of the components were uploaded successfully. |
| 1037 |
| 1038 def _GetComponentNumber(component): |
| 1039 return int(component.object_name[component.object_name.rfind('_')+1:]) |
| 1040 # Sort the components so that they will be composed in the correct order. |
| 1041 components = sorted(components, key=_GetComponentNumber) |
| 1042 |
| 1043 request_components = [] |
| 1044 for component_url in components: |
| 1045 src_obj_metadata = ( |
| 1046 apitools_messages.ComposeRequest.SourceObjectsValueListEntry( |
| 1047 name=component_url.object_name)) |
| 1048 if component_url.HasGeneration(): |
| 1049 src_obj_metadata.generation = long(component_url.generation) |
| 1050 request_components.append(src_obj_metadata) |
| 1051 |
| 1052 composed_object = gsutil_api.ComposeObject( |
| 1053 request_components, dst_obj_metadata, preconditions=preconditions, |
| 1054 provider=dst_url.scheme, fields=['generation', 'crc32c', 'size']) |
| 1055 |
| 1056 try: |
| 1057 # Make sure only to delete things that we know were successfully |
| 1058 # uploaded (as opposed to all of the objects that we attempted to |
| 1059 # create) so that we don't delete any preexisting objects, except for |
| 1060 # those that were uploaded by a previous, failed run and have since |
| 1061 # changed (but still have an old generation lying around). |
| 1062 objects_to_delete = components + existing_objects_to_delete |
| 1063 command_obj.Apply(_DeleteObjectFn, objects_to_delete, _RmExceptionHandler, |
| 1064 arg_checker=gslib.command.DummyArgChecker, |
| 1065 parallel_operations_override=True) |
| 1066 except Exception: # pylint: disable=broad-except |
| 1067 # If some of the delete calls fail, don't cause the whole command to |
| 1068 # fail. The copy was successful iff the compose call succeeded, so |
| 1069 # reduce this to a warning. |
| 1070 logging.warning( |
| 1071 'Failed to delete some of the following temporary objects:\n' + |
| 1072 '\n'.join(dst_args.keys())) |
| 1073 finally: |
| 1074 with tracker_file_lock: |
| 1075 if os.path.exists(tracker_file): |
| 1076 os.unlink(tracker_file) |
| 1077 else: |
| 1078 # Some of the components failed to upload. In this case, we want to exit |
| 1079 # without deleting the objects. |
| 1080 raise CommandException( |
| 1081 'Some temporary components were not uploaded successfully. ' |
| 1082 'Please retry this upload.') |
| 1083 |
| 1084 elapsed_time = time.time() - start_time |
| 1085 return elapsed_time, composed_object |
| 1086 |
| 1087 |
| 1088 def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url, |
| 1089 file_size, canned_acl=None): |
| 1090 """Determines whether parallel composite upload strategy should be used. |
| 1091 |
| 1092 Args: |
| 1093 logger: for outputting log messages. |
| 1094 allow_splitting: If false, then this function returns false. |
| 1095 src_url: FileUrl corresponding to a local file. |
| 1096 dst_url: CloudUrl corresponding to destination cloud object. |
| 1097 file_size: The size of the source file, in bytes. |
| 1098 canned_acl: Canned ACL to apply to destination object, if any. |
| 1099 |
| 1100 Returns: |
| 1101 True iff a parallel upload should be performed on the source file. |
| 1102 """ |
| 1103 global suggested_parallel_composites |
| 1104 parallel_composite_upload_threshold = HumanReadableToBytes(config.get( |
| 1105 'GSUtil', 'parallel_composite_upload_threshold', |
| 1106 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD)) |
| 1107 |
| 1108 all_factors_but_size = ( |
| 1109 allow_splitting # Don't split the pieces multiple times. |
| 1110 and not src_url.IsStream() # We can't partition streams. |
| 1111 and dst_url.scheme == 'gs' # Compose is only for gs. |
| 1112 and not canned_acl) # TODO: Implement canned ACL support for compose. |
| 1113 |
| 1114 # Since parallel composite uploads are disabled by default, make user aware of |
| 1115 # them. |
| 1116 # TODO: Once compiled crcmod is being distributed by major Linux distributions |
| 1117 # remove this check. |
| 1118 if (all_factors_but_size and parallel_composite_upload_threshold == 0 |
| 1119 and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD |
| 1120 and not suggested_parallel_composites): |
| 1121 logger.info('\n'.join(textwrap.wrap( |
| 1122 '==> NOTE: You are uploading one or more large file(s), which would ' |
| 1123 'run significantly faster if you enable parallel composite uploads. ' |
| 1124 'This feature can be enabled by editing the ' |
| 1125 '"parallel_composite_upload_threshold" value in your .boto ' |
| 1126 'configuration file. However, note that if you do this you and any ' |
| 1127 'users that download such composite files will need to have a compiled ' |
| 1128 'crcmod installed (see "gsutil help crcmod").')) + '\n') |
| 1129 suggested_parallel_composites = True |
| 1130 |
| 1131 return (all_factors_but_size |
| 1132 and parallel_composite_upload_threshold > 0 |
| 1133 and file_size >= parallel_composite_upload_threshold) |
| 1134 |
| 1135 |
| 1136 def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id): |
| 1137 """Expands wildcard if present in url_str. |
| 1138 |
| 1139 Args: |
| 1140 url_str: String representation of requested url. |
| 1141 gsutil_api: gsutil Cloud API instance to use. |
| 1142 debug: debug level to use (for iterators). |
| 1143 project_id: project ID to use (for iterators). |
| 1144 |
| 1145 Returns: |
| 1146 (exp_url, have_existing_dst_container) |
| 1147 where exp_url is a StorageUrl |
| 1148 and have_existing_dst_container is a bool indicating whether |
| 1149 exp_url names an existing directory, bucket, or bucket subdirectory. |
| 1150 In the case where we match a subdirectory AND an object, the |
| 1151 object is returned. |
| 1152 |
| 1153 Raises: |
| 1154 CommandException: if url_str matched more than 1 URL. |
| 1155 """ |
| 1156 # Handle wildcarded url case. |
| 1157 if ContainsWildcard(url_str): |
| 1158 blr_expansion = list(CreateWildcardIterator(url_str, gsutil_api, |
| 1159 debug=debug, |
| 1160 project_id=project_id)) |
| 1161 if len(blr_expansion) != 1: |
| 1162 raise CommandException('Destination (%s) must match exactly 1 URL' % |
| 1163 url_str) |
| 1164 blr = blr_expansion[0] |
| 1165 # BLR is either an OBJECT, PREFIX, or BUCKET; the latter two represent |
| 1166 # directories. |
| 1167 return (StorageUrlFromString(blr.url_string), not blr.IsObject()) |
| 1168 |
| 1169 storage_url = StorageUrlFromString(url_str) |
| 1170 |
| 1171 # Handle non-wildcarded url: |
| 1172 if storage_url.IsFileUrl(): |
| 1173 return (storage_url, storage_url.IsDirectory()) |
| 1174 |
| 1175 # At this point we have a cloud URL. |
| 1176 if storage_url.IsBucket(): |
| 1177 return (storage_url, True) |
| 1178 |
| 1179 # For object/prefix URLs check 3 cases: (a) if the name ends with '/' treat |
| 1180 # as a subdir; otherwise, use the wildcard iterator with url to |
| 1181 # find if (b) there's a Prefix matching url, or (c) name is of form |
| 1182 # dir_$folder$ (and in both these cases also treat dir as a subdir). |
| 1183 # Cloud subdirs are always considered to be an existing container. |
| 1184 if IsCloudSubdirPlaceholder(storage_url): |
| 1185 return (storage_url, True) |
| 1186 |
| 1187 # Check for the special case where we have a folder marker object |
| 1188 folder_expansion = CreateWildcardIterator( |
| 1189 url_str + '_$folder$', gsutil_api, debug=debug, |
| 1190 project_id=project_id).IterAll( |
| 1191 bucket_listing_fields=['name']) |
| 1192 for blr in folder_expansion: |
| 1193 return (storage_url, True) |
| 1194 |
| 1195 blr_expansion = CreateWildcardIterator(url_str, gsutil_api, |
| 1196 debug=debug, |
| 1197 project_id=project_id).IterAll( |
| 1198 bucket_listing_fields=['name']) |
| 1199 for blr in blr_expansion: |
| 1200 if blr.IsPrefix(): |
| 1201 return (storage_url, True) |
| 1202 |
| 1203 return (storage_url, False) |
| 1204 |
| 1205 |
| 1206 def FixWindowsNaming(src_url, dst_url): |
| 1207 """Translates Windows pathnames to cloud pathnames. |
| 1208 |
| 1209 Rewrites the destination URL built by ConstructDstUrl(). |
| 1210 |
| 1211 Args: |
| 1212 src_url: Source StorageUrl to be copied. |
| 1213 dst_url: The destination StorageUrl built by ConstructDstUrl(). |
| 1214 |
| 1215 Returns: |
| 1216 StorageUrl to use for copy. |
| 1217 """ |
| 1218 if (src_url.IsFileUrl() and src_url.delim == '\\' |
| 1219 and dst_url.IsCloudUrl()): |
| 1220 trans_url_str = re.sub(r'\\', '/', dst_url.url_string) |
| 1221 dst_url = StorageUrlFromString(trans_url_str) |
| 1222 return dst_url |
| 1223 |
| 1224 |
| 1225 def StdinIterator(): |
| 1226 """A generator function that returns lines from stdin.""" |
| 1227 for line in sys.stdin: |
| 1228 # Strip CRLF. |
| 1229 yield line.rstrip() |
| 1230 |
| 1231 |
| 1232 def SrcDstSame(src_url, dst_url): |
| 1233 """Checks if src_url and dst_url represent the same object or file. |
| 1234 |
| 1235 We don't handle anything about hard or symbolic links. |
| 1236 |
| 1237 Args: |
| 1238 src_url: Source StorageUrl. |
| 1239 dst_url: Destination StorageUrl. |
| 1240 |
| 1241 Returns: |
| 1242 Bool indicator. |
| 1243 """ |
| 1244 if src_url.IsFileUrl() and dst_url.IsFileUrl(): |
| 1245 # Translate a/b/./c to a/b/c, so src=dst comparison below works. |
| 1246 new_src_path = os.path.normpath(src_url.object_name) |
| 1247 new_dst_path = os.path.normpath(dst_url.object_name) |
| 1248 return new_src_path == new_dst_path |
| 1249 else: |
| 1250 return (src_url.url_string == dst_url.url_string and |
| 1251 src_url.generation == dst_url.generation) |
| 1252 |
| 1253 |
| 1254 class _HaltingCopyCallbackHandler(object): |
| 1255 """Test callback handler for intentionally stopping a resumable transfer.""" |
| 1256 |
| 1257 def __init__(self, is_upload, display_url, halt_at_byte, logger): |
| 1258 self.is_upload = is_upload |
| 1259 self.display_url = display_url |
| 1260 self.halt_at_byte = halt_at_byte |
| 1261 self.logger = logger |
| 1262 |
| 1263 # pylint: disable=invalid-name |
| 1264 def call(self, total_bytes_transferred, total_size): |
| 1265 """Forcibly exits if the transfer has passed the halting point.""" |
| 1266 if total_bytes_transferred >= self.halt_at_byte: |
| 1267 if self.logger.isEnabledFor(logging.INFO): |
| 1268 sys.stderr.write( |
| 1269 'Halting transfer after byte %s. %s/%s transferred.\r\n' % ( |
| 1270 self.halt_at_byte, MakeHumanReadable(total_bytes_transferred), |
| 1271 MakeHumanReadable(total_size))) |
| 1272 if self.is_upload: |
| 1273 raise ResumableUploadException('Artifically halting upload.') |
| 1274 else: |
| 1275 raise ResumableDownloadException('Artifically halting download.') |
| 1276 |
| 1277 |
| 1278 def _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata): |
| 1279 """Logs copy operation, including Content-Type if appropriate. |
| 1280 |
| 1281 Args: |
| 1282 logger: logger instance to use for output. |
| 1283 src_url: Source StorageUrl. |
| 1284 dst_url: Destination StorageUrl. |
| 1285 dst_obj_metadata: Object-specific metadata that should be overidden during |
| 1286 the copy. |
| 1287 """ |
| 1288 if (dst_url.IsCloudUrl() and dst_obj_metadata and |
| 1289 dst_obj_metadata.contentType): |
| 1290 content_type_msg = ' [Content-Type=%s]' % dst_obj_metadata.contentType |
| 1291 else: |
| 1292 content_type_msg = '' |
| 1293 if src_url.IsFileUrl() and src_url.IsStream(): |
| 1294 logger.info('Copying from <STDIN>%s...', content_type_msg) |
| 1295 else: |
| 1296 logger.info('Copying %s%s...', src_url.url_string, content_type_msg) |
| 1297 |
| 1298 |
| 1299 # pylint: disable=undefined-variable |
| 1300 def _CopyObjToObjInTheCloud(src_url, src_obj_size, dst_url, |
| 1301 dst_obj_metadata, preconditions, gsutil_api): |
| 1302 """Performs copy-in-the cloud from specified src to dest object. |
| 1303 |
| 1304 Args: |
| 1305 src_url: Source CloudUrl. |
| 1306 src_obj_size: Size of source object. |
| 1307 dst_url: Destination CloudUrl. |
| 1308 dst_obj_metadata: Object-specific metadata that should be overidden during |
| 1309 the copy. |
| 1310 preconditions: Preconditions to use for the copy. |
| 1311 gsutil_api: gsutil Cloud API instance to use for the copy. |
| 1312 |
| 1313 Returns: |
| 1314 (elapsed_time, bytes_transferred, dst_url with generation, |
| 1315 md5 hash of destination) excluding overhead like initial GET. |
| 1316 |
| 1317 Raises: |
| 1318 CommandException: if errors encountered. |
| 1319 """ |
| 1320 start_time = time.time() |
| 1321 |
| 1322 dst_obj = gsutil_api.CopyObject( |
| 1323 src_url.bucket_name, src_url.object_name, |
| 1324 src_generation=src_url.generation, dst_obj_metadata=dst_obj_metadata, |
| 1325 canned_acl=global_copy_helper_opts.canned_acl, |
| 1326 preconditions=preconditions, provider=dst_url.scheme, |
| 1327 fields=UPLOAD_RETURN_FIELDS) |
| 1328 |
| 1329 end_time = time.time() |
| 1330 |
| 1331 result_url = dst_url.Clone() |
| 1332 result_url.generation = GenerationFromUrlAndString(result_url, |
| 1333 dst_obj.generation) |
| 1334 |
| 1335 return (end_time - start_time, src_obj_size, result_url, dst_obj.md5Hash) |
| 1336 |
| 1337 |
| 1338 def _CheckFreeSpace(path): |
| 1339 """Return path/drive free space (in bytes).""" |
| 1340 if IS_WINDOWS: |
| 1341 # pylint: disable=g-import-not-at-top |
| 1342 try: |
| 1343 # pylint: disable=invalid-name |
| 1344 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_wchar_p, |
| 1345 POINTER(c_uint64), |
| 1346 POINTER(c_uint64), |
| 1347 POINTER(c_uint64)) |
| 1348 get_disk_free_space_ex = get_disk_free_space_ex( |
| 1349 ('GetDiskFreeSpaceExW', windll.kernel32), ( |
| 1350 (1, 'lpszPathName'), |
| 1351 (2, 'lpFreeUserSpace'), |
| 1352 (2, 'lpTotalSpace'), |
| 1353 (2, 'lpFreeSpace'),)) |
| 1354 except AttributeError: |
| 1355 get_disk_free_space_ex = WINFUNCTYPE(c_int, c_char_p, |
| 1356 POINTER(c_uint64), |
| 1357 POINTER(c_uint64), |
| 1358 POINTER(c_uint64)) |
| 1359 get_disk_free_space_ex = get_disk_free_space_ex( |
| 1360 ('GetDiskFreeSpaceExA', windll.kernel32), ( |
| 1361 (1, 'lpszPathName'), |
| 1362 (2, 'lpFreeUserSpace'), |
| 1363 (2, 'lpTotalSpace'), |
| 1364 (2, 'lpFreeSpace'),)) |
| 1365 |
| 1366 def GetDiskFreeSpaceExErrCheck(result, unused_func, args): |
| 1367 if not result: |
| 1368 raise WinError() |
| 1369 return args[1].value |
| 1370 get_disk_free_space_ex.errcheck = GetDiskFreeSpaceExErrCheck |
| 1371 |
| 1372 return get_disk_free_space_ex(os.getenv('SystemDrive')) |
| 1373 else: |
| 1374 (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path) |
| 1375 return f_frsize * f_bavail |
| 1376 |
| 1377 |
| 1378 def _SetContentTypeFromFile(src_url, dst_obj_metadata): |
| 1379 """Detects and sets Content-Type if src_url names a local file. |
| 1380 |
| 1381 Args: |
| 1382 src_url: Source StorageUrl. |
| 1383 dst_obj_metadata: Object-specific metadata that should be overidden during |
| 1384 the copy. |
| 1385 """ |
| 1386 # contentType == '' if user requested default type. |
| 1387 if (dst_obj_metadata.contentType is None and src_url.IsFileUrl() |
| 1388 and not src_url.IsStream()): |
| 1389 # Only do content type recognition if src_url is a file. Object-to-object |
| 1390 # copies with no -h Content-Type specified re-use the content type of the |
| 1391 # source object. |
| 1392 object_name = src_url.object_name |
| 1393 content_type = None |
| 1394 # Streams (denoted by '-') are expected to be 'application/octet-stream' |
| 1395 # and 'file' would partially consume them. |
| 1396 if object_name != '-': |
| 1397 if config.getbool('GSUtil', 'use_magicfile', False): |
| 1398 p = subprocess.Popen(['file', '--mime-type', object_name], |
| 1399 stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| 1400 output, error = p.communicate() |
| 1401 if p.returncode != 0 or error: |
| 1402 raise CommandException( |
| 1403 'Encountered error running "file --mime-type %s" ' |
| 1404 '(returncode=%d).\n%s' % (object_name, p.returncode, error)) |
| 1405 # Parse output by removing line delimiter and splitting on last ": |
| 1406 content_type = output.rstrip().rpartition(': ')[2] |
| 1407 else: |
| 1408 content_type = mimetypes.guess_type(object_name)[0] |
| 1409 if not content_type: |
| 1410 content_type = DEFAULT_CONTENT_TYPE |
| 1411 dst_obj_metadata.contentType = content_type |
| 1412 |
| 1413 |
| 1414 # pylint: disable=undefined-variable |
| 1415 def _UploadFileToObjectNonResumable(src_url, src_obj_filestream, |
| 1416 src_obj_size, dst_url, dst_obj_metadata, |
| 1417 preconditions, gsutil_api, logger): |
| 1418 """Uploads the file using a non-resumable strategy. |
| 1419 |
| 1420 Args: |
| 1421 src_url: Source StorageUrl to upload. |
| 1422 src_obj_filestream: File pointer to uploadable bytes. |
| 1423 src_obj_size: Size of the source object. |
| 1424 dst_url: Destination StorageUrl for the upload. |
| 1425 dst_obj_metadata: Metadata for the target object. |
| 1426 preconditions: Preconditions for the upload, if any. |
| 1427 gsutil_api: gsutil Cloud API instance to use for the upload. |
| 1428 logger: For outputting log messages. |
| 1429 |
| 1430 Returns: |
| 1431 Elapsed upload time, uploaded Object with generation, md5, and size fields |
| 1432 populated. |
| 1433 """ |
| 1434 progress_callback = FileProgressCallbackHandler( |
| 1435 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call |
| 1436 start_time = time.time() |
| 1437 |
| 1438 if src_url.IsStream(): |
| 1439 # TODO: gsutil-beta: Provide progress callbacks for streaming uploads. |
| 1440 uploaded_object = gsutil_api.UploadObjectStreaming( |
| 1441 src_obj_filestream, object_metadata=dst_obj_metadata, |
| 1442 canned_acl=global_copy_helper_opts.canned_acl, |
| 1443 preconditions=preconditions, progress_callback=progress_callback, |
| 1444 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS) |
| 1445 else: |
| 1446 uploaded_object = gsutil_api.UploadObject( |
| 1447 src_obj_filestream, object_metadata=dst_obj_metadata, |
| 1448 canned_acl=global_copy_helper_opts.canned_acl, size=src_obj_size, |
| 1449 preconditions=preconditions, progress_callback=progress_callback, |
| 1450 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS) |
| 1451 end_time = time.time() |
| 1452 elapsed_time = end_time - start_time |
| 1453 |
| 1454 return elapsed_time, uploaded_object |
| 1455 |
| 1456 |
| 1457 # pylint: disable=undefined-variable |
| 1458 def _UploadFileToObjectResumable(src_url, src_obj_filestream, |
| 1459 src_obj_size, dst_url, dst_obj_metadata, |
| 1460 preconditions, gsutil_api, logger): |
| 1461 """Uploads the file using a resumable strategy. |
| 1462 |
| 1463 Args: |
| 1464 src_url: Source FileUrl to upload. Must not be a stream. |
| 1465 src_obj_filestream: File pointer to uploadable bytes. |
| 1466 src_obj_size: Size of the source object. |
| 1467 dst_url: Destination StorageUrl for the upload. |
| 1468 dst_obj_metadata: Metadata for the target object. |
| 1469 preconditions: Preconditions for the upload, if any. |
| 1470 gsutil_api: gsutil Cloud API instance to use for the upload. |
| 1471 logger: for outputting log messages. |
| 1472 |
| 1473 Returns: |
| 1474 Elapsed upload time, uploaded Object with generation, md5, and size fields |
| 1475 populated. |
| 1476 """ |
| 1477 tracker_file_name = GetTrackerFilePath( |
| 1478 dst_url, TrackerFileType.UPLOAD, |
| 1479 gsutil_api.GetApiSelector(provider=dst_url.scheme)) |
| 1480 |
| 1481 def _UploadTrackerCallback(serialization_data): |
| 1482 """Creates a new tracker file for starting an upload from scratch. |
| 1483 |
| 1484 This function is called by the gsutil Cloud API implementation and the |
| 1485 the serialization data is implementation-specific. |
| 1486 |
| 1487 Args: |
| 1488 serialization_data: Serialization data used in resuming the upload. |
| 1489 """ |
| 1490 tracker_file = None |
| 1491 try: |
| 1492 tracker_file = open(tracker_file_name, 'w') |
| 1493 tracker_file.write(str(serialization_data)) |
| 1494 except IOError as e: |
| 1495 raise CommandException(TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT % |
| 1496 (tracker_file_name, e.strerror)) |
| 1497 finally: |
| 1498 if tracker_file: |
| 1499 tracker_file.close() |
| 1500 |
| 1501 # This contains the upload URL, which will uniquely identify the |
| 1502 # destination object. |
| 1503 tracker_data = _GetUploadTrackerData(tracker_file_name, logger) |
| 1504 if tracker_data: |
| 1505 logger.info( |
| 1506 'Resuming upload for %s', src_url.url_string) |
| 1507 |
| 1508 retryable = True |
| 1509 |
| 1510 progress_callback = FileProgressCallbackHandler( |
| 1511 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call |
| 1512 if global_copy_helper_opts.halt_at_byte: |
| 1513 progress_callback = _HaltingCopyCallbackHandler( |
| 1514 True, dst_url, global_copy_helper_opts.halt_at_byte, logger).call |
| 1515 |
| 1516 start_time = time.time() |
| 1517 try: |
| 1518 uploaded_object = gsutil_api.UploadObjectResumable( |
| 1519 src_obj_filestream, object_metadata=dst_obj_metadata, |
| 1520 canned_acl=global_copy_helper_opts.canned_acl, |
| 1521 preconditions=preconditions, provider=dst_url.scheme, |
| 1522 size=src_obj_size, serialization_data=tracker_data, |
| 1523 fields=UPLOAD_RETURN_FIELDS, |
| 1524 tracker_callback=_UploadTrackerCallback, |
| 1525 progress_callback=progress_callback) |
| 1526 retryable = False |
| 1527 except ResumableUploadAbortException: |
| 1528 retryable = False |
| 1529 raise |
| 1530 finally: |
| 1531 if not retryable: |
| 1532 _DeleteTrackerFile(tracker_file_name) |
| 1533 |
| 1534 end_time = time.time() |
| 1535 elapsed_time = end_time - start_time |
| 1536 |
| 1537 return (elapsed_time, uploaded_object) |
| 1538 |
| 1539 |
| 1540 def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger): |
| 1541 """Compresses a to-be-uploaded local file to save bandwidth. |
| 1542 |
| 1543 Args: |
| 1544 src_url: Source FileUrl. |
| 1545 src_obj_filestream: Read stream of the source file - will be consumed |
| 1546 and closed. |
| 1547 src_obj_size: Size of the source file. |
| 1548 logger: for outputting log messages. |
| 1549 |
| 1550 Returns: |
| 1551 StorageUrl path to compressed file, compressed file size. |
| 1552 """ |
| 1553 # TODO: Compress using a streaming model as opposed to all at once here. |
| 1554 if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING: |
| 1555 logger.info( |
| 1556 'Compressing %s (to tmp)...', src_url) |
| 1557 (gzip_fh, gzip_path) = tempfile.mkstemp() |
| 1558 gzip_fp = None |
| 1559 try: |
| 1560 # Check for temp space. Assume the compressed object is at most 2x |
| 1561 # the size of the object (normally should compress to smaller than |
| 1562 # the object) |
| 1563 if _CheckFreeSpace(gzip_path) < 2*int(src_obj_size): |
| 1564 raise CommandException('Inadequate temp space available to compress ' |
| 1565 '%s. See the CHANGING TEMP DIRECTORIES section ' |
| 1566 'of "gsutil help cp" for more info.' % src_url) |
| 1567 gzip_fp = gzip.open(gzip_path, 'wb') |
| 1568 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) |
| 1569 while data: |
| 1570 gzip_fp.write(data) |
| 1571 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) |
| 1572 finally: |
| 1573 if gzip_fp: |
| 1574 gzip_fp.close() |
| 1575 os.close(gzip_fh) |
| 1576 src_obj_filestream.close() |
| 1577 gzip_size = os.path.getsize(gzip_path) |
| 1578 return StorageUrlFromString(gzip_path), gzip_size |
| 1579 |
| 1580 |
| 1581 def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size, |
| 1582 dst_url, dst_obj_metadata, preconditions, gsutil_api, |
| 1583 logger, command_obj, copy_exception_handler, |
| 1584 gzip_exts=None, allow_splitting=True): |
| 1585 """Uploads a local file to an object. |
| 1586 |
| 1587 Args: |
| 1588 src_url: Source FileUrl. |
| 1589 src_obj_filestream: Read stream of the source file to be read and closed. |
| 1590 src_obj_size: Size of the source file. |
| 1591 dst_url: Destination CloudUrl. |
| 1592 dst_obj_metadata: Metadata to be applied to the destination object. |
| 1593 preconditions: Preconditions to use for the copy. |
| 1594 gsutil_api: gsutil Cloud API to use for the copy. |
| 1595 logger: for outputting log messages. |
| 1596 command_obj: command object for use in Apply in parallel composite uploads. |
| 1597 copy_exception_handler: For handling copy exceptions during Apply. |
| 1598 gzip_exts: List of file extensions to gzip prior to upload, if any. |
| 1599 allow_splitting: Whether to allow the file to be split into component |
| 1600 pieces for an parallel composite upload. |
| 1601 |
| 1602 Returns: |
| 1603 (elapsed_time, bytes_transferred, dst_url with generation, |
| 1604 md5 hash of destination) excluding overhead like initial GET. |
| 1605 |
| 1606 Raises: |
| 1607 CommandException: if errors encountered. |
| 1608 """ |
| 1609 if not dst_obj_metadata or not dst_obj_metadata.contentLanguage: |
| 1610 content_language = config.get_value('GSUtil', 'content_language') |
| 1611 if content_language: |
| 1612 dst_obj_metadata.contentLanguage = content_language |
| 1613 |
| 1614 fname_parts = src_url.object_name.split('.') |
| 1615 upload_url = src_url |
| 1616 upload_stream = src_obj_filestream |
| 1617 upload_size = src_obj_size |
| 1618 zipped_file = False |
| 1619 if gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts: |
| 1620 upload_url, upload_size = _CompressFileForUpload( |
| 1621 src_url, src_obj_filestream, src_obj_size, logger) |
| 1622 upload_stream = open(upload_url.object_name, 'rb') |
| 1623 dst_obj_metadata.contentEncoding = 'gzip' |
| 1624 zipped_file = True |
| 1625 |
| 1626 elapsed_time = None |
| 1627 uploaded_object = None |
| 1628 hash_algs = GetUploadHashAlgs() |
| 1629 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) |
| 1630 |
| 1631 parallel_composite_upload = _ShouldDoParallelCompositeUpload( |
| 1632 logger, allow_splitting, upload_url, dst_url, src_obj_size, |
| 1633 canned_acl=global_copy_helper_opts.canned_acl) |
| 1634 |
| 1635 if not parallel_composite_upload and len(hash_algs): |
| 1636 # Parallel composite uploads calculate hashes per-component in subsequent |
| 1637 # calls to this function, but the composition of the final object is a |
| 1638 # cloud-only operation. |
| 1639 wrapped_filestream = HashingFileUploadWrapper(upload_stream, digesters, |
| 1640 hash_algs, upload_url, logger) |
| 1641 else: |
| 1642 wrapped_filestream = upload_stream |
| 1643 |
| 1644 try: |
| 1645 if parallel_composite_upload: |
| 1646 elapsed_time, uploaded_object = _DoParallelCompositeUpload( |
| 1647 upload_stream, upload_url, dst_url, dst_obj_metadata, |
| 1648 global_copy_helper_opts.canned_acl, upload_size, preconditions, |
| 1649 gsutil_api, command_obj, copy_exception_handler) |
| 1650 elif upload_size < ResumableThreshold() or src_url.IsStream(): |
| 1651 elapsed_time, uploaded_object = _UploadFileToObjectNonResumable( |
| 1652 upload_url, wrapped_filestream, upload_size, dst_url, |
| 1653 dst_obj_metadata, preconditions, gsutil_api, logger) |
| 1654 else: |
| 1655 elapsed_time, uploaded_object = _UploadFileToObjectResumable( |
| 1656 upload_url, wrapped_filestream, upload_size, dst_url, |
| 1657 dst_obj_metadata, preconditions, gsutil_api, logger) |
| 1658 |
| 1659 finally: |
| 1660 if zipped_file: |
| 1661 try: |
| 1662 os.unlink(upload_url.object_name) |
| 1663 # Windows sometimes complains the temp file is locked when you try to |
| 1664 # delete it. |
| 1665 except Exception: # pylint: disable=broad-except |
| 1666 logger.warning( |
| 1667 'Could not delete %s. This can occur in Windows because the ' |
| 1668 'temporary file is still locked.', upload_url.object_name) |
| 1669 # In the gzip case, this is the gzip stream. _CompressFileForUpload will |
| 1670 # have already closed the original source stream. |
| 1671 upload_stream.close() |
| 1672 |
| 1673 if not parallel_composite_upload: |
| 1674 try: |
| 1675 digests = _CreateDigestsFromDigesters(digesters) |
| 1676 _CheckHashes(logger, dst_url, uploaded_object, src_url.object_name, |
| 1677 digests, is_upload=True) |
| 1678 except CommandException, e: |
| 1679 # If the digest doesn't match, delete the object. |
| 1680 if 'doesn\'t match cloud-supplied digest' in str(e): |
| 1681 gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name, |
| 1682 generation=uploaded_object.generation, |
| 1683 provider=dst_url.scheme) |
| 1684 raise |
| 1685 |
| 1686 result_url = dst_url.Clone() |
| 1687 |
| 1688 result_url.generation = uploaded_object.generation |
| 1689 result_url.generation = GenerationFromUrlAndString( |
| 1690 result_url, uploaded_object.generation) |
| 1691 |
| 1692 return (elapsed_time, uploaded_object.size, result_url, |
| 1693 uploaded_object.md5Hash) |
| 1694 |
| 1695 |
| 1696 # TODO: Refactor this long function into smaller pieces. |
| 1697 # pylint: disable=too-many-statements |
| 1698 def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
| 1699 gsutil_api, logger, test_method=None): |
| 1700 """Downloads an object to a local file. |
| 1701 |
| 1702 Args: |
| 1703 src_url: Source CloudUrl. |
| 1704 src_obj_metadata: Metadata from the source object. |
| 1705 dst_url: Destination FileUrl. |
| 1706 gsutil_api: gsutil Cloud API instance to use for the download. |
| 1707 logger: for outputting log messages. |
| 1708 test_method: Optional test method for modifying the file before validation |
| 1709 during unit tests. |
| 1710 Returns: |
| 1711 (elapsed_time, bytes_transferred, dst_url, md5), excluding overhead like |
| 1712 initial GET. |
| 1713 |
| 1714 Raises: |
| 1715 CommandException: if errors encountered. |
| 1716 """ |
| 1717 file_name = dst_url.object_name |
| 1718 dir_name = os.path.dirname(file_name) |
| 1719 if dir_name and not os.path.exists(dir_name): |
| 1720 # Do dir creation in try block so can ignore case where dir already |
| 1721 # exists. This is needed to avoid a race condition when running gsutil |
| 1722 # -m cp. |
| 1723 try: |
| 1724 os.makedirs(dir_name) |
| 1725 except OSError, e: |
| 1726 if e.errno != errno.EEXIST: |
| 1727 raise |
| 1728 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) |
| 1729 # For gzipped objects download to a temp file and unzip. For the XML API, |
| 1730 # the represents the result of a HEAD request. For the JSON API, this is |
| 1731 # the stored encoding which the service may not respect. However, if the |
| 1732 # server sends decompressed bytes for a file that is stored compressed |
| 1733 # (double compressed case), there is no way we can validate the hash and |
| 1734 # we will fail our hash check for the object. |
| 1735 if (src_obj_metadata.contentEncoding and |
| 1736 src_obj_metadata.contentEncoding.lower().endswith('gzip')): |
| 1737 # We can't use tempfile.mkstemp() here because we need a predictable |
| 1738 # filename for resumable downloads. |
| 1739 download_file_name = _GetDownloadZipFileName(file_name) |
| 1740 logger.info( |
| 1741 'Downloading to temp gzip filename %s', download_file_name) |
| 1742 need_to_unzip = True |
| 1743 else: |
| 1744 download_file_name = file_name |
| 1745 need_to_unzip = False |
| 1746 |
| 1747 if download_file_name.endswith(dst_url.delim): |
| 1748 logger.warn('\n'.join(textwrap.wrap( |
| 1749 'Skipping attempt to download to filename ending with slash (%s). This ' |
| 1750 'typically happens when using gsutil to download from a subdirectory ' |
| 1751 'created by the Cloud Console (https://cloud.google.com/console)' |
| 1752 % download_file_name))) |
| 1753 return (0, 0, dst_url, '') |
| 1754 |
| 1755 # Set up hash digesters. |
| 1756 hash_algs = GetDownloadHashAlgs( |
| 1757 logger, src_has_md5=src_obj_metadata.md5Hash, |
| 1758 src_has_crc32c=src_obj_metadata.crc32c) |
| 1759 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) |
| 1760 |
| 1761 fp = None |
| 1762 # Tracks whether the server used a gzip encoding. |
| 1763 server_encoding = None |
| 1764 download_complete = False |
| 1765 download_strategy = _SelectDownloadStrategy(src_obj_metadata, dst_url) |
| 1766 download_start_point = 0 |
| 1767 # This is used for resuming downloads, but also for passing the mediaLink |
| 1768 # and size into the download for new downloads so that we can avoid |
| 1769 # making an extra HTTP call. |
| 1770 serialization_data = None |
| 1771 serialization_dict = GetDownloadSerializationDict(src_obj_metadata) |
| 1772 try: |
| 1773 if download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: |
| 1774 fp = open(download_file_name, 'wb') |
| 1775 elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE: |
| 1776 # If this is a resumable download, we need to open the file for append and |
| 1777 # manage a tracker file. |
| 1778 fp = open(download_file_name, 'ab') |
| 1779 |
| 1780 resuming = _ReadOrCreateDownloadTrackerFile( |
| 1781 src_obj_metadata, dst_url, api_selector) |
| 1782 if resuming: |
| 1783 # Find out how far along we are so we can request the appropriate |
| 1784 # remaining range of the object. |
| 1785 existing_file_size = GetFileSize(fp, position_to_eof=True) |
| 1786 if existing_file_size > src_obj_metadata.size: |
| 1787 _DeleteTrackerFile(GetTrackerFilePath( |
| 1788 dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
| 1789 raise CommandException( |
| 1790 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' |
| 1791 'if you re-try this download it will start from scratch' % |
| 1792 (fp.name, existing_file_size, src_url.object_name, |
| 1793 src_obj_metadata.size)) |
| 1794 else: |
| 1795 if existing_file_size == src_obj_metadata.size: |
| 1796 logger.info( |
| 1797 'Download already complete for file %s, skipping download but ' |
| 1798 'will run integrity checks.', download_file_name) |
| 1799 download_complete = True |
| 1800 else: |
| 1801 download_start_point = existing_file_size |
| 1802 serialization_dict['progress'] = download_start_point |
| 1803 logger.info('Resuming download for %s', src_url.url_string) |
| 1804 # Catch up our digester with the hash data. |
| 1805 if existing_file_size > TEN_MB: |
| 1806 for alg_name in digesters: |
| 1807 logger.info( |
| 1808 'Catching up %s for %s', alg_name, download_file_name) |
| 1809 with open(download_file_name, 'rb') as hash_fp: |
| 1810 while True: |
| 1811 data = hash_fp.read(DEFAULT_FILE_BUFFER_SIZE) |
| 1812 if not data: |
| 1813 break |
| 1814 for alg_name in digesters: |
| 1815 digesters[alg_name].update(data) |
| 1816 else: |
| 1817 # Starting a new download, blow away whatever is already there. |
| 1818 fp.truncate(0) |
| 1819 |
| 1820 else: |
| 1821 raise CommandException('Invalid download strategy %s chosen for' |
| 1822 'file %s' % (download_strategy, fp.name)) |
| 1823 |
| 1824 if not dst_url.IsStream(): |
| 1825 serialization_data = json.dumps(serialization_dict) |
| 1826 |
| 1827 progress_callback = FileProgressCallbackHandler( |
| 1828 ConstructAnnounceText('Downloading', dst_url.url_string), |
| 1829 logger).call |
| 1830 if global_copy_helper_opts.halt_at_byte: |
| 1831 progress_callback = _HaltingCopyCallbackHandler( |
| 1832 False, dst_url, global_copy_helper_opts.halt_at_byte, logger).call |
| 1833 |
| 1834 start_time = time.time() |
| 1835 # TODO: With gzip encoding (which may occur on-the-fly and not be part of |
| 1836 # the object's metadata), when we request a range to resume, it's possible |
| 1837 # that the server will just resend the entire object, which means our |
| 1838 # caught-up hash will be incorrect. We recalculate the hash on |
| 1839 # the local file in the case of a failed gzip hash anyway, but it would |
| 1840 # be better if we actively detected this case. |
| 1841 if not download_complete: |
| 1842 server_encoding = gsutil_api.GetObjectMedia( |
| 1843 src_url.bucket_name, src_url.object_name, fp, |
| 1844 start_byte=download_start_point, generation=src_url.generation, |
| 1845 object_size=src_obj_metadata.size, |
| 1846 download_strategy=download_strategy, provider=src_url.scheme, |
| 1847 serialization_data=serialization_data, digesters=digesters, |
| 1848 progress_callback=progress_callback) |
| 1849 |
| 1850 end_time = time.time() |
| 1851 |
| 1852 # If a custom test method is defined, call it here. For the copy command, |
| 1853 # test methods are expected to take one argument: an open file pointer, |
| 1854 # and are used to perturb the open file during download to exercise |
| 1855 # download error detection. |
| 1856 if test_method: |
| 1857 test_method(fp) |
| 1858 except ResumableDownloadException as e: |
| 1859 logger.warning('Caught ResumableDownloadException (%s) for file %s.', |
| 1860 e.reason, file_name) |
| 1861 raise |
| 1862 finally: |
| 1863 if fp: |
| 1864 fp.close() |
| 1865 |
| 1866 # If we decompressed a content-encoding gzip file on the fly, this may not |
| 1867 # be accurate, but it is the best we can do without going deep into the |
| 1868 # underlying HTTP libraries. Note that this value is only used for |
| 1869 # reporting in log messages; inaccuracy doesn't impact the integrity of the |
| 1870 # download. |
| 1871 bytes_transferred = src_obj_metadata.size - download_start_point |
| 1872 server_gzip = server_encoding and server_encoding.lower().endswith('gzip') |
| 1873 local_md5 = _ValidateDownloadHashes(logger, src_url, src_obj_metadata, |
| 1874 dst_url, need_to_unzip, server_gzip, |
| 1875 digesters, hash_algs, api_selector, |
| 1876 bytes_transferred) |
| 1877 |
| 1878 return (end_time - start_time, bytes_transferred, dst_url, local_md5) |
| 1879 |
| 1880 |
| 1881 def _GetDownloadZipFileName(file_name): |
| 1882 """Returns the file name for a temporarily compressed downloaded file.""" |
| 1883 return '%s_.gztmp' % file_name |
| 1884 |
| 1885 |
| 1886 def _ValidateDownloadHashes(logger, src_url, src_obj_metadata, dst_url, |
| 1887 need_to_unzip, server_gzip, digesters, hash_algs, |
| 1888 api_selector, bytes_transferred): |
| 1889 """Validates a downloaded file's integrity. |
| 1890 |
| 1891 Args: |
| 1892 logger: For outputting log messages. |
| 1893 src_url: StorageUrl for the source object. |
| 1894 src_obj_metadata: Metadata for the source object, potentially containing |
| 1895 hash values. |
| 1896 dst_url: StorageUrl describing the destination file. |
| 1897 need_to_unzip: If true, a temporary zip file was used and must be |
| 1898 uncompressed as part of validation. |
| 1899 server_gzip: If true, the server gzipped the bytes (regardless of whether |
| 1900 the object metadata claimed it was gzipped). |
| 1901 digesters: dict of {string, hash digester} that contains up-to-date digests |
| 1902 computed during the download. If a digester for a particular |
| 1903 algorithm is None, an up-to-date digest is not available and the |
| 1904 hash must be recomputed from the local file. |
| 1905 hash_algs: dict of {string, hash algorithm} that can be used if digesters |
| 1906 don't have up-to-date digests. |
| 1907 api_selector: The Cloud API implementation used (used tracker file naming). |
| 1908 bytes_transferred: Number of bytes downloaded (used for logging). |
| 1909 |
| 1910 Returns: |
| 1911 An MD5 of the local file, if one was calculated as part of the integrity |
| 1912 check. |
| 1913 """ |
| 1914 file_name = dst_url.object_name |
| 1915 download_file_name = (_GetDownloadZipFileName(file_name) if need_to_unzip else |
| 1916 file_name) |
| 1917 digesters_succeeded = True |
| 1918 for alg in digesters: |
| 1919 # If we get a digester with a None algorithm, the underlying |
| 1920 # implementation failed to calculate a digest, so we will need to |
| 1921 # calculate one from scratch. |
| 1922 if not digesters[alg]: |
| 1923 digesters_succeeded = False |
| 1924 break |
| 1925 |
| 1926 if digesters_succeeded: |
| 1927 local_hashes = _CreateDigestsFromDigesters(digesters) |
| 1928 else: |
| 1929 local_hashes = _CreateDigestsFromLocalFile( |
| 1930 logger, hash_algs, download_file_name, src_obj_metadata) |
| 1931 |
| 1932 digest_verified = True |
| 1933 hash_invalid_exception = None |
| 1934 try: |
| 1935 _CheckHashes(logger, src_url, src_obj_metadata, download_file_name, |
| 1936 local_hashes) |
| 1937 _DeleteTrackerFile(GetTrackerFilePath( |
| 1938 dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
| 1939 except CommandException, e: |
| 1940 # If an non-gzipped object gets sent with gzip content encoding, the hash |
| 1941 # we calculate will match the gzipped bytes, not the original object. Thus, |
| 1942 # we'll need to calculate and check it after unzipping. |
| 1943 if ('doesn\'t match cloud-supplied digest' in str(e) and |
| 1944 (server_gzip or api_selector == ApiSelector.XML)): |
| 1945 if server_gzip: |
| 1946 logger.debug( |
| 1947 'Hash did not match but server gzipped the content, will ' |
| 1948 'recalculate.') |
| 1949 else: |
| 1950 logger.debug( |
| 1951 'Hash did not match but server may have gzipped the content, will ' |
| 1952 'recalculate.') |
| 1953 # Save off the exception in case this isn't a gzipped file. |
| 1954 hash_invalid_exception = e |
| 1955 digest_verified = False |
| 1956 else: |
| 1957 _DeleteTrackerFile(GetTrackerFilePath( |
| 1958 dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
| 1959 os.unlink(file_name) |
| 1960 raise |
| 1961 |
| 1962 if server_gzip and not need_to_unzip: |
| 1963 # Server compressed bytes on-the-fly, thus we need to rename and decompress. |
| 1964 # We can't decompress on-the-fly because prior to Python 3.2 the gzip |
| 1965 # module makes a bunch of seek calls on the stream. |
| 1966 download_file_name = _GetDownloadZipFileName(file_name) |
| 1967 os.rename(file_name, download_file_name) |
| 1968 |
| 1969 if need_to_unzip or server_gzip: |
| 1970 # Log that we're uncompressing if the file is big enough that |
| 1971 # decompressing would make it look like the transfer "stalled" at the end. |
| 1972 if bytes_transferred > TEN_MB: |
| 1973 logger.info( |
| 1974 'Uncompressing downloaded tmp file to %s...', file_name) |
| 1975 |
| 1976 # Downloaded gzipped file to a filename w/o .gz extension, so unzip. |
| 1977 gzip_fp = None |
| 1978 try: |
| 1979 gzip_fp = gzip.open(download_file_name, 'rb') |
| 1980 with open(file_name, 'wb') as f_out: |
| 1981 data = gzip_fp.read(GZIP_CHUNK_SIZE) |
| 1982 while data: |
| 1983 f_out.write(data) |
| 1984 data = gzip_fp.read(GZIP_CHUNK_SIZE) |
| 1985 except IOError, e: |
| 1986 # In the XML case where we don't know if the file was gzipped, raise |
| 1987 # the original hash exception if we find that it wasn't. |
| 1988 if 'Not a gzipped file' in str(e) and hash_invalid_exception: |
| 1989 # Linter improperly thinks we're raising None despite the above check. |
| 1990 # pylint: disable=raising-bad-type |
| 1991 raise hash_invalid_exception |
| 1992 finally: |
| 1993 if gzip_fp: |
| 1994 gzip_fp.close() |
| 1995 |
| 1996 os.unlink(download_file_name) |
| 1997 |
| 1998 if not digest_verified: |
| 1999 try: |
| 2000 # Recalculate hashes on the unzipped local file. |
| 2001 local_hashes = _CreateDigestsFromLocalFile(logger, hash_algs, file_name, |
| 2002 src_obj_metadata) |
| 2003 _CheckHashes(logger, src_url, src_obj_metadata, file_name, local_hashes) |
| 2004 _DeleteTrackerFile(GetTrackerFilePath( |
| 2005 dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
| 2006 except CommandException, e: |
| 2007 _DeleteTrackerFile(GetTrackerFilePath( |
| 2008 dst_url, TrackerFileType.DOWNLOAD, api_selector)) |
| 2009 os.unlink(file_name) |
| 2010 raise |
| 2011 |
| 2012 if 'md5' in local_hashes: |
| 2013 return local_hashes['md5'] |
| 2014 |
| 2015 |
| 2016 def _CopyFileToFile(src_url, dst_url): |
| 2017 """Copies a local file to a local file. |
| 2018 |
| 2019 Args: |
| 2020 src_url: Source FileUrl. |
| 2021 dst_url: Destination FileUrl. |
| 2022 Returns: |
| 2023 (elapsed_time, bytes_transferred, dst_url, md5=None). |
| 2024 |
| 2025 Raises: |
| 2026 CommandException: if errors encountered. |
| 2027 """ |
| 2028 src_fp = GetStreamFromFileUrl(src_url) |
| 2029 dir_name = os.path.dirname(dst_url.object_name) |
| 2030 if dir_name and not os.path.exists(dir_name): |
| 2031 os.makedirs(dir_name) |
| 2032 dst_fp = open(dst_url.object_name, 'wb') |
| 2033 start_time = time.time() |
| 2034 shutil.copyfileobj(src_fp, dst_fp) |
| 2035 end_time = time.time() |
| 2036 return (end_time - start_time, os.path.getsize(dst_url.object_name), |
| 2037 dst_url, None) |
| 2038 |
| 2039 |
| 2040 def _DummyTrackerCallback(_): |
| 2041 pass |
| 2042 |
| 2043 |
| 2044 # pylint: disable=undefined-variable |
| 2045 def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url, |
| 2046 dst_obj_metadata, preconditions, gsutil_api, |
| 2047 logger): |
| 2048 """Copies from src_url to dst_url in "daisy chain" mode. |
| 2049 |
| 2050 See -D OPTION documentation about what daisy chain mode is. |
| 2051 |
| 2052 Args: |
| 2053 src_url: Source CloudUrl |
| 2054 src_obj_metadata: Metadata from source object |
| 2055 dst_url: Destination CloudUrl |
| 2056 dst_obj_metadata: Object-specific metadata that should be overidden during |
| 2057 the copy. |
| 2058 preconditions: Preconditions to use for the copy. |
| 2059 gsutil_api: gsutil Cloud API to use for the copy. |
| 2060 logger: For outputting log messages. |
| 2061 |
| 2062 Returns: |
| 2063 (elapsed_time, bytes_transferred, dst_url with generation, |
| 2064 md5 hash of destination) excluding overhead like initial GET. |
| 2065 |
| 2066 Raises: |
| 2067 CommandException: if errors encountered. |
| 2068 """ |
| 2069 # We don't attempt to preserve ACLs across providers because |
| 2070 # GCS and S3 support different ACLs and disjoint principals. |
| 2071 if (global_copy_helper_opts.preserve_acl |
| 2072 and src_url.scheme != dst_url.scheme): |
| 2073 raise NotImplementedError( |
| 2074 'Cross-provider cp -p not supported') |
| 2075 if not global_copy_helper_opts.preserve_acl: |
| 2076 dst_obj_metadata.acl = [] |
| 2077 |
| 2078 start_time = time.time() |
| 2079 upload_fp = DaisyChainWrapper(src_url, src_obj_metadata.size, gsutil_api) |
| 2080 if src_obj_metadata.size == 0: |
| 2081 # Resumable uploads of size 0 are not supported. |
| 2082 uploaded_object = gsutil_api.UploadObject( |
| 2083 upload_fp, object_metadata=dst_obj_metadata, |
| 2084 canned_acl=global_copy_helper_opts.canned_acl, |
| 2085 preconditions=preconditions, provider=dst_url.scheme, |
| 2086 fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size) |
| 2087 else: |
| 2088 # TODO: Support process-break resumes. This will resume across connection |
| 2089 # breaks and server errors, but the tracker callback is a no-op so this |
| 2090 # won't resume across gsutil runs. |
| 2091 uploaded_object = gsutil_api.UploadObjectResumable( |
| 2092 upload_fp, object_metadata=dst_obj_metadata, |
| 2093 canned_acl=global_copy_helper_opts.canned_acl, |
| 2094 preconditions=preconditions, provider=dst_url.scheme, |
| 2095 fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size, |
| 2096 progress_callback=FileProgressCallbackHandler( |
| 2097 ConstructAnnounceText('Uploading', dst_url.url_string), |
| 2098 logger).call, |
| 2099 tracker_callback=_DummyTrackerCallback) |
| 2100 end_time = time.time() |
| 2101 |
| 2102 try: |
| 2103 _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, |
| 2104 uploaded_object) |
| 2105 except CommandException, e: |
| 2106 if 'doesn\'t match cloud-supplied digest' in str(e): |
| 2107 gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name, |
| 2108 generation=uploaded_object.generation, |
| 2109 provider=dst_url.scheme) |
| 2110 raise |
| 2111 |
| 2112 result_url = dst_url.Clone() |
| 2113 result_url.generation = GenerationFromUrlAndString( |
| 2114 result_url, uploaded_object.generation) |
| 2115 |
| 2116 return (end_time - start_time, src_obj_metadata.size, result_url, |
| 2117 uploaded_object.md5Hash) |
| 2118 |
| 2119 |
| 2120 # pylint: disable=undefined-variable |
| 2121 # pylint: disable=too-many-statements |
| 2122 def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, |
| 2123 copy_exception_handler, allow_splitting=True, |
| 2124 headers=None, manifest=None, gzip_exts=None, test_method=None): |
| 2125 """Performs copy from src_url to dst_url, handling various special cases. |
| 2126 |
| 2127 Args: |
| 2128 logger: for outputting log messages. |
| 2129 src_url: Source StorageUrl. |
| 2130 dst_url: Destination StorageUrl. |
| 2131 gsutil_api: gsutil Cloud API instance to use for the copy. |
| 2132 command_obj: command object for use in Apply in parallel composite uploads. |
| 2133 copy_exception_handler: for handling copy exceptions during Apply. |
| 2134 allow_splitting: Whether to allow the file to be split into component |
| 2135 pieces for an parallel composite upload. |
| 2136 headers: optional headers to use for the copy operation. |
| 2137 manifest: optional manifest for tracking copy operations. |
| 2138 gzip_exts: List of file extensions to gzip for uploads, if any. |
| 2139 test_method: optional test method for modifying files during unit tests. |
| 2140 |
| 2141 Returns: |
| 2142 (elapsed_time, bytes_transferred, version-specific dst_url) excluding |
| 2143 overhead like initial GET. |
| 2144 |
| 2145 Raises: |
| 2146 ItemExistsError: if no clobber flag is specified and the destination |
| 2147 object already exists. |
| 2148 CommandException: if other errors encountered. |
| 2149 """ |
| 2150 if headers: |
| 2151 dst_obj_headers = headers.copy() |
| 2152 else: |
| 2153 dst_obj_headers = {} |
| 2154 |
| 2155 # Create a metadata instance for each destination object so metadata |
| 2156 # such as content-type can be applied per-object. |
| 2157 # Initialize metadata from any headers passed in via -h. |
| 2158 dst_obj_metadata = ObjectMetadataFromHeaders(dst_obj_headers) |
| 2159 |
| 2160 if dst_url.IsCloudUrl() and dst_url.scheme == 'gs': |
| 2161 preconditions = PreconditionsFromHeaders(dst_obj_headers) |
| 2162 else: |
| 2163 preconditions = Preconditions() |
| 2164 |
| 2165 src_obj_metadata = None |
| 2166 src_obj_filestream = None |
| 2167 if src_url.IsCloudUrl(): |
| 2168 src_obj_fields = None |
| 2169 if dst_url.IsCloudUrl(): |
| 2170 # For cloud or daisy chain copy, we need every copyable field. |
| 2171 # If we're not modifying or overriding any of the fields, we can get |
| 2172 # away without retrieving the object metadata because the copy |
| 2173 # operation can succeed with just the destination bucket and object |
| 2174 # name. But if we are sending any metadata, the JSON API will expect a |
| 2175 # complete object resource. Since we want metadata like the object size |
| 2176 # for our own tracking, we just get all of the metadata here. |
| 2177 src_obj_fields = ['cacheControl', 'componentCount', |
| 2178 'contentDisposition', 'contentEncoding', |
| 2179 'contentLanguage', 'contentType', 'crc32c', |
| 2180 'etag', 'generation', 'md5Hash', 'mediaLink', |
| 2181 'metadata', 'metageneration', 'size'] |
| 2182 # We only need the ACL if we're going to preserve it. |
| 2183 if global_copy_helper_opts.preserve_acl: |
| 2184 src_obj_fields.append('acl') |
| 2185 if (src_url.scheme == dst_url.scheme |
| 2186 and not global_copy_helper_opts.daisy_chain): |
| 2187 copy_in_the_cloud = True |
| 2188 else: |
| 2189 copy_in_the_cloud = False |
| 2190 else: |
| 2191 # Just get the fields needed to validate the download. |
| 2192 src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag', |
| 2193 'mediaLink', 'md5Hash', 'size'] |
| 2194 try: |
| 2195 src_generation = GenerationFromUrlAndString(src_url, src_url.generation) |
| 2196 src_obj_metadata = gsutil_api.GetObjectMetadata( |
| 2197 src_url.bucket_name, src_url.object_name, |
| 2198 generation=src_generation, provider=src_url.scheme, |
| 2199 fields=src_obj_fields) |
| 2200 except NotFoundException: |
| 2201 raise CommandException( |
| 2202 'NotFoundException: Could not retrieve source object %s.' % |
| 2203 src_url.url_string) |
| 2204 src_obj_size = src_obj_metadata.size |
| 2205 dst_obj_metadata.contentType = src_obj_metadata.contentType |
| 2206 if global_copy_helper_opts.preserve_acl: |
| 2207 dst_obj_metadata.acl = src_obj_metadata.acl |
| 2208 # Special case for S3-to-S3 copy URLs using |
| 2209 # global_copy_helper_opts.preserve_acl. |
| 2210 # dst_url will be verified in _CopyObjToObjDaisyChainMode if it |
| 2211 # is not s3 (and thus differs from src_url). |
| 2212 if src_url.scheme == 's3': |
| 2213 acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata) |
| 2214 if acl_text: |
| 2215 AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text) |
| 2216 else: |
| 2217 try: |
| 2218 src_obj_filestream = GetStreamFromFileUrl(src_url) |
| 2219 except: |
| 2220 raise CommandException('"%s" does not exist.' % src_url) |
| 2221 if src_url.IsStream(): |
| 2222 src_obj_size = None |
| 2223 else: |
| 2224 src_obj_size = os.path.getsize(src_url.object_name) |
| 2225 |
| 2226 if global_copy_helper_opts.use_manifest: |
| 2227 # Set the source size in the manifest. |
| 2228 manifest.Set(src_url.url_string, 'size', src_obj_size) |
| 2229 |
| 2230 # On Windows, stdin is opened as text mode instead of binary which causes |
| 2231 # problems when piping a binary file, so this switches it to binary mode. |
| 2232 if IS_WINDOWS and src_url.IsFileUrl() and src_url.IsStream(): |
| 2233 msvcrt.setmode(GetStreamFromFileUrl(src_url).fileno(), os.O_BINARY) |
| 2234 |
| 2235 if global_copy_helper_opts.no_clobber: |
| 2236 # There are two checks to prevent clobbering: |
| 2237 # 1) The first check is to see if the URL |
| 2238 # already exists at the destination and prevent the upload/download |
| 2239 # from happening. This is done by the exists() call. |
| 2240 # 2) The second check is only relevant if we are writing to gs. We can |
| 2241 # enforce that the server only writes the object if it doesn't exist |
| 2242 # by specifying the header below. This check only happens at the |
| 2243 # server after the complete file has been uploaded. We specify this |
| 2244 # header to prevent a race condition where a destination file may |
| 2245 # be created after the first check and before the file is fully |
| 2246 # uploaded. |
| 2247 # In order to save on unnecessary uploads/downloads we perform both |
| 2248 # checks. However, this may come at the cost of additional HTTP calls. |
| 2249 if preconditions.gen_match: |
| 2250 raise ArgumentException('Specifying x-goog-if-generation-match is ' |
| 2251 'not supported with cp -n') |
| 2252 else: |
| 2253 preconditions.gen_match = 0 |
| 2254 if dst_url.IsFileUrl() and os.path.exists(dst_url.object_name): |
| 2255 # The local file may be a partial. Check the file sizes. |
| 2256 if src_obj_size == os.path.getsize(dst_url.object_name): |
| 2257 raise ItemExistsError() |
| 2258 elif dst_url.IsCloudUrl(): |
| 2259 try: |
| 2260 dst_object = gsutil_api.GetObjectMetadata( |
| 2261 dst_url.bucket_name, dst_url.object_name, provider=dst_url.scheme) |
| 2262 except NotFoundException: |
| 2263 dst_object = None |
| 2264 if dst_object: |
| 2265 raise ItemExistsError() |
| 2266 |
| 2267 if dst_url.IsCloudUrl(): |
| 2268 # Cloud storage API gets object and bucket name from metadata. |
| 2269 dst_obj_metadata.name = dst_url.object_name |
| 2270 dst_obj_metadata.bucket = dst_url.bucket_name |
| 2271 if src_url.IsCloudUrl(): |
| 2272 # Preserve relevant metadata from the source object if it's not already |
| 2273 # provided from the headers. |
| 2274 CopyObjectMetadata(src_obj_metadata, dst_obj_metadata, override=False) |
| 2275 else: |
| 2276 _SetContentTypeFromFile(src_url, dst_obj_metadata) |
| 2277 else: |
| 2278 # Files don't have Cloud API metadata. |
| 2279 dst_obj_metadata = None |
| 2280 |
| 2281 _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata) |
| 2282 |
| 2283 if global_copy_helper_opts.canned_acl: |
| 2284 # No canned ACL support in JSON, force XML API to be used for |
| 2285 # upload/copy operations. |
| 2286 orig_prefer_api = gsutil_api.prefer_api |
| 2287 gsutil_api.prefer_api = ApiSelector.XML |
| 2288 |
| 2289 try: |
| 2290 if src_url.IsCloudUrl(): |
| 2291 if dst_url.IsFileUrl(): |
| 2292 return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, |
| 2293 gsutil_api, logger, |
| 2294 test_method=test_method) |
| 2295 elif copy_in_the_cloud: |
| 2296 return _CopyObjToObjInTheCloud(src_url, src_obj_size, dst_url, |
| 2297 dst_obj_metadata, preconditions, |
| 2298 gsutil_api) |
| 2299 else: |
| 2300 return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, |
| 2301 dst_url, dst_obj_metadata, |
| 2302 preconditions, gsutil_api, logger) |
| 2303 else: # src_url.IsFileUrl() |
| 2304 if dst_url.IsCloudUrl(): |
| 2305 return _UploadFileToObject( |
| 2306 src_url, src_obj_filestream, src_obj_size, dst_url, |
| 2307 dst_obj_metadata, preconditions, gsutil_api, logger, command_obj, |
| 2308 copy_exception_handler, gzip_exts=gzip_exts, |
| 2309 allow_splitting=allow_splitting) |
| 2310 else: # dst_url.IsFileUrl() |
| 2311 return _CopyFileToFile(src_url, dst_url) |
| 2312 finally: |
| 2313 if global_copy_helper_opts.canned_acl: |
| 2314 gsutil_api.prefer_api = orig_prefer_api |
| 2315 |
| 2316 |
| 2317 class Manifest(object): |
| 2318 """Stores the manifest items for the CpCommand class.""" |
| 2319 |
| 2320 def __init__(self, path): |
| 2321 # self.items contains a dictionary of rows |
| 2322 self.items = {} |
| 2323 self.manifest_filter = {} |
| 2324 self.lock = CreateLock() |
| 2325 |
| 2326 self.manifest_path = os.path.expanduser(path) |
| 2327 self._ParseManifest() |
| 2328 self._CreateManifestFile() |
| 2329 |
| 2330 def _ParseManifest(self): |
| 2331 """Load and parse a manifest file. |
| 2332 |
| 2333 This information will be used to skip any files that have a skip or OK |
| 2334 status. |
| 2335 """ |
| 2336 try: |
| 2337 if os.path.exists(self.manifest_path): |
| 2338 with open(self.manifest_path, 'rb') as f: |
| 2339 first_row = True |
| 2340 reader = csv.reader(f) |
| 2341 for row in reader: |
| 2342 if first_row: |
| 2343 try: |
| 2344 source_index = row.index('Source') |
| 2345 result_index = row.index('Result') |
| 2346 except ValueError: |
| 2347 # No header and thus not a valid manifest file. |
| 2348 raise CommandException( |
| 2349 'Missing headers in manifest file: %s' % self.manifest_path) |
| 2350 first_row = False |
| 2351 source = row[source_index] |
| 2352 result = row[result_index] |
| 2353 if result in ['OK', 'skip']: |
| 2354 # We're always guaranteed to take the last result of a specific |
| 2355 # source url. |
| 2356 self.manifest_filter[source] = result |
| 2357 except IOError: |
| 2358 raise CommandException('Could not parse %s' % self.manifest_path) |
| 2359 |
| 2360 def WasSuccessful(self, src): |
| 2361 """Returns whether the specified src url was marked as successful.""" |
| 2362 return src in self.manifest_filter |
| 2363 |
| 2364 def _CreateManifestFile(self): |
| 2365 """Opens the manifest file and assigns it to the file pointer.""" |
| 2366 try: |
| 2367 if ((not os.path.exists(self.manifest_path)) |
| 2368 or (os.stat(self.manifest_path).st_size == 0)): |
| 2369 # Add headers to the new file. |
| 2370 with open(self.manifest_path, 'wb', 1) as f: |
| 2371 writer = csv.writer(f) |
| 2372 writer.writerow(['Source', |
| 2373 'Destination', |
| 2374 'Start', |
| 2375 'End', |
| 2376 'Md5', |
| 2377 'UploadId', |
| 2378 'Source Size', |
| 2379 'Bytes Transferred', |
| 2380 'Result', |
| 2381 'Description']) |
| 2382 except IOError: |
| 2383 raise CommandException('Could not create manifest file.') |
| 2384 |
| 2385 def Set(self, url, key, value): |
| 2386 if value is None: |
| 2387 # In case we don't have any information to set we bail out here. |
| 2388 # This is so that we don't clobber existing information. |
| 2389 # To zero information pass '' instead of None. |
| 2390 return |
| 2391 if url in self.items: |
| 2392 self.items[url][key] = value |
| 2393 else: |
| 2394 self.items[url] = {key: value} |
| 2395 |
| 2396 def Initialize(self, source_url, destination_url): |
| 2397 # Always use the source_url as the key for the item. This is unique. |
| 2398 self.Set(source_url, 'source_uri', source_url) |
| 2399 self.Set(source_url, 'destination_uri', destination_url) |
| 2400 self.Set(source_url, 'start_time', datetime.datetime.utcnow()) |
| 2401 |
| 2402 def SetResult(self, source_url, bytes_transferred, result, |
| 2403 description=''): |
| 2404 self.Set(source_url, 'bytes', bytes_transferred) |
| 2405 self.Set(source_url, 'result', result) |
| 2406 self.Set(source_url, 'description', description) |
| 2407 self.Set(source_url, 'end_time', datetime.datetime.utcnow()) |
| 2408 self._WriteRowToManifestFile(source_url) |
| 2409 self._RemoveItemFromManifest(source_url) |
| 2410 |
| 2411 def _WriteRowToManifestFile(self, url): |
| 2412 """Writes a manifest entry to the manifest file for the url argument.""" |
| 2413 row_item = self.items[url] |
| 2414 data = [ |
| 2415 str(row_item['source_uri'].encode(UTF8)), |
| 2416 str(row_item['destination_uri'].encode(UTF8)), |
| 2417 '%sZ' % row_item['start_time'].isoformat(), |
| 2418 '%sZ' % row_item['end_time'].isoformat(), |
| 2419 row_item['md5'] if 'md5' in row_item else '', |
| 2420 row_item['upload_id'] if 'upload_id' in row_item else '', |
| 2421 str(row_item['size']) if 'size' in row_item else '', |
| 2422 str(row_item['bytes']) if 'bytes' in row_item else '', |
| 2423 row_item['result'], |
| 2424 row_item['description'].encode(UTF8)] |
| 2425 |
| 2426 # Aquire a lock to prevent multiple threads writing to the same file at |
| 2427 # the same time. This would cause a garbled mess in the manifest file. |
| 2428 with self.lock: |
| 2429 with open(self.manifest_path, 'a', 1) as f: # 1 == line buffered |
| 2430 writer = csv.writer(f) |
| 2431 writer.writerow(data) |
| 2432 |
| 2433 def _RemoveItemFromManifest(self, url): |
| 2434 # Remove the item from the dictionary since we're done with it and |
| 2435 # we don't want the dictionary to grow too large in memory for no good |
| 2436 # reason. |
| 2437 del self.items[url] |
| 2438 |
| 2439 |
| 2440 class ItemExistsError(Exception): |
| 2441 """Exception class for objects that are skipped because they already exist.""" |
| 2442 pass |
| 2443 |
| 2444 |
| 2445 def GetPathBeforeFinalDir(url): |
| 2446 """Returns the path section before the final directory component of the URL. |
| 2447 |
| 2448 This handles cases for file system directories, bucket, and bucket |
| 2449 subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket', |
| 2450 and for file://dir we'll return file:// |
| 2451 |
| 2452 Args: |
| 2453 url: StorageUrl representing a filesystem directory, cloud bucket or |
| 2454 bucket subdir. |
| 2455 |
| 2456 Returns: |
| 2457 String name of above-described path, sans final path separator. |
| 2458 """ |
| 2459 sep = url.delim |
| 2460 if url.IsFileUrl(): |
| 2461 past_scheme = url.url_string[len('file://'):] |
| 2462 if past_scheme.find(sep) == -1: |
| 2463 return 'file://' |
| 2464 else: |
| 2465 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] |
| 2466 if url.IsBucket(): |
| 2467 return '%s://' % url.scheme |
| 2468 # Else it names a bucket subdir. |
| 2469 return url.url_string.rstrip(sep).rpartition(sep)[0] |
| 2470 |
| 2471 |
| 2472 def _HashFilename(filename): |
| 2473 """Apply a hash function (SHA1) to shorten the passed file name. |
| 2474 |
| 2475 The spec for the hashed file name is as follows: |
| 2476 |
| 2477 TRACKER_<hash>_<trailing> |
| 2478 |
| 2479 where hash is a SHA1 hash on the original file name and trailing is |
| 2480 the last 16 chars from the original file name. Max file name lengths |
| 2481 vary by operating system so the goal of this function is to ensure |
| 2482 the hashed version takes fewer than 100 characters. |
| 2483 |
| 2484 Args: |
| 2485 filename: file name to be hashed. |
| 2486 |
| 2487 Returns: |
| 2488 shorter, hashed version of passed file name |
| 2489 """ |
| 2490 if isinstance(filename, unicode): |
| 2491 filename = filename.encode(UTF8) |
| 2492 else: |
| 2493 filename = unicode(filename, UTF8).encode(UTF8) |
| 2494 m = hashlib.sha1(filename) |
| 2495 return 'TRACKER_' + m.hexdigest() + '.' + filename[-16:] |
| 2496 |
| 2497 |
| 2498 def _DivideAndCeil(dividend, divisor): |
| 2499 """Returns ceil(dividend / divisor). |
| 2500 |
| 2501 Takes care to avoid the pitfalls of floating point arithmetic that could |
| 2502 otherwise yield the wrong result for large numbers. |
| 2503 |
| 2504 Args: |
| 2505 dividend: Dividend for the operation. |
| 2506 divisor: Divisor for the operation. |
| 2507 |
| 2508 Returns: |
| 2509 Quotient. |
| 2510 """ |
| 2511 quotient = dividend // divisor |
| 2512 if (dividend % divisor) != 0: |
| 2513 quotient += 1 |
| 2514 return quotient |
| 2515 |
| 2516 |
| 2517 def _GetPartitionInfo(file_size, max_components, default_component_size): |
| 2518 """Gets info about a file partition for parallel composite uploads. |
| 2519 |
| 2520 Args: |
| 2521 file_size: The number of bytes in the file to be partitioned. |
| 2522 max_components: The maximum number of components that can be composed. |
| 2523 default_component_size: The size of a component, assuming that |
| 2524 max_components is infinite. |
| 2525 Returns: |
| 2526 The number of components in the partitioned file, and the size of each |
| 2527 component (except the last, which will have a different size iff |
| 2528 file_size != 0 (mod num_components)). |
| 2529 """ |
| 2530 # num_components = ceil(file_size / default_component_size) |
| 2531 num_components = _DivideAndCeil(file_size, default_component_size) |
| 2532 |
| 2533 # num_components must be in the range [2, max_components] |
| 2534 num_components = max(min(num_components, max_components), 2) |
| 2535 |
| 2536 # component_size = ceil(file_size / num_components) |
| 2537 component_size = _DivideAndCeil(file_size, num_components) |
| 2538 return (num_components, component_size) |
| 2539 |
| 2540 |
| 2541 def _DeleteObjectFn(cls, url_to_delete, thread_state=None): |
| 2542 """Wrapper function to be used with command.Apply().""" |
| 2543 gsutil_api = GetCloudApiInstance(cls, thread_state) |
| 2544 gsutil_api.DeleteObject( |
| 2545 url_to_delete.bucket_name, url_to_delete.object_name, |
| 2546 generation=url_to_delete.generation, provider=url_to_delete.scheme) |
| 2547 |
| 2548 |
| 2549 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock): |
| 2550 """Parse the tracker file from the last parallel composite upload attempt. |
| 2551 |
| 2552 If it exists, the tracker file is of the format described in |
| 2553 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be |
| 2554 read, then the upload will start from the beginning. |
| 2555 |
| 2556 Args: |
| 2557 tracker_file: The name of the file to parse. |
| 2558 tracker_file_lock: Lock protecting access to the tracker file. |
| 2559 |
| 2560 Returns: |
| 2561 random_prefix: A randomly-generated prefix to the name of the |
| 2562 temporary components. |
| 2563 existing_objects: A list of ObjectFromTracker objects representing |
| 2564 the set of files that have already been uploaded. |
| 2565 """ |
| 2566 existing_objects = [] |
| 2567 try: |
| 2568 with tracker_file_lock: |
| 2569 f = open(tracker_file, 'r') |
| 2570 lines = f.readlines() |
| 2571 lines = [line.strip() for line in lines] |
| 2572 f.close() |
| 2573 except IOError as e: |
| 2574 # We can't read the tracker file, so generate a new random prefix. |
| 2575 lines = [str(random.randint(1, (10 ** 10) - 1))] |
| 2576 |
| 2577 # Ignore non-existent file (happens first time an upload |
| 2578 # is attempted on a file), but warn user for other errors. |
| 2579 if e.errno != errno.ENOENT: |
| 2580 # Will restart because we failed to read in the file. |
| 2581 print('Couldn\'t read parallel upload tracker file (%s): %s. ' |
| 2582 'Restarting upload from scratch.' % (tracker_file, e.strerror)) |
| 2583 |
| 2584 # The first line contains the randomly-generated prefix. |
| 2585 random_prefix = lines[0] |
| 2586 |
| 2587 # The remaining lines were written in pairs to describe a single component |
| 2588 # in the form: |
| 2589 # object_name (without random prefix) |
| 2590 # generation |
| 2591 # Newlines are used as the delimiter because only newlines and carriage |
| 2592 # returns are invalid characters in object names, and users can specify |
| 2593 # a custom prefix in the config file. |
| 2594 i = 1 |
| 2595 while i < len(lines): |
| 2596 (name, generation) = (lines[i], lines[i+1]) |
| 2597 if not generation: |
| 2598 # Cover the '' case. |
| 2599 generation = None |
| 2600 existing_objects.append(ObjectFromTracker(name, generation)) |
| 2601 i += 2 |
| 2602 return (random_prefix, existing_objects) |
| 2603 |
| 2604 |
| 2605 def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component, |
| 2606 tracker_file_lock): |
| 2607 """Appends info about the uploaded component to an existing tracker file. |
| 2608 |
| 2609 Follows the format described in _CreateParallelUploadTrackerFile. |
| 2610 |
| 2611 Args: |
| 2612 tracker_file: Tracker file to append to. |
| 2613 component: Component that was uploaded. |
| 2614 tracker_file_lock: Thread and process-safe Lock for the tracker file. |
| 2615 """ |
| 2616 lines = _GetParallelUploadTrackerFileLinesForComponents([component]) |
| 2617 lines = [line + '\n' for line in lines] |
| 2618 with tracker_file_lock: |
| 2619 with open(tracker_file, 'a') as f: |
| 2620 f.writelines(lines) |
| 2621 |
| 2622 |
| 2623 def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components, |
| 2624 tracker_file_lock): |
| 2625 """Writes information about components that were successfully uploaded. |
| 2626 |
| 2627 This way the upload can be resumed at a later date. The tracker file has |
| 2628 the format: |
| 2629 random_prefix |
| 2630 temp_object_1_name |
| 2631 temp_object_1_generation |
| 2632 . |
| 2633 . |
| 2634 . |
| 2635 temp_object_N_name |
| 2636 temp_object_N_generation |
| 2637 where N is the number of components that have been successfully uploaded. |
| 2638 |
| 2639 Args: |
| 2640 tracker_file: The name of the parallel upload tracker file. |
| 2641 random_prefix: The randomly-generated prefix that was used for |
| 2642 for uploading any existing components. |
| 2643 components: A list of ObjectFromTracker objects that were uploaded. |
| 2644 tracker_file_lock: The lock protecting access to the tracker file. |
| 2645 """ |
| 2646 lines = [random_prefix] |
| 2647 lines += _GetParallelUploadTrackerFileLinesForComponents(components) |
| 2648 lines = [line + '\n' for line in lines] |
| 2649 try: |
| 2650 with tracker_file_lock: |
| 2651 open(tracker_file, 'w').close() # Clear the file. |
| 2652 with open(tracker_file, 'w') as f: |
| 2653 f.writelines(lines) |
| 2654 except IOError as e: |
| 2655 raise CommandException(TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT % |
| 2656 (tracker_file, e.strerror)) |
| 2657 |
| 2658 |
| 2659 def _GetParallelUploadTrackerFileLinesForComponents(components): |
| 2660 """Return a list of the lines for use in a parallel upload tracker file. |
| 2661 |
| 2662 The lines represent the given components, using the format as described in |
| 2663 _CreateParallelUploadTrackerFile. |
| 2664 |
| 2665 Args: |
| 2666 components: A list of ObjectFromTracker objects that were uploaded. |
| 2667 |
| 2668 Returns: |
| 2669 Lines describing components with their generation for outputting to the |
| 2670 tracker file. |
| 2671 """ |
| 2672 lines = [] |
| 2673 for component in components: |
| 2674 generation = None |
| 2675 generation = component.generation |
| 2676 if not generation: |
| 2677 generation = '' |
| 2678 lines += [component.object_name, str(generation)] |
| 2679 return lines |
| 2680 |
| 2681 |
| 2682 def FilterExistingComponents(dst_args, existing_components, bucket_url, |
| 2683 gsutil_api): |
| 2684 """Determines course of action for component objects. |
| 2685 |
| 2686 Given the list of all target objects based on partitioning the file and |
| 2687 the list of objects that have already been uploaded successfully, |
| 2688 this function determines which objects should be uploaded, which |
| 2689 existing components are still valid, and which existing components should |
| 2690 be deleted. |
| 2691 |
| 2692 Args: |
| 2693 dst_args: The map of file_name -> PerformParallelUploadFileToObjectArgs |
| 2694 calculated by partitioning the file. |
| 2695 existing_components: A list of ObjectFromTracker objects that have been |
| 2696 uploaded in the past. |
| 2697 bucket_url: CloudUrl of the bucket in which the components exist. |
| 2698 gsutil_api: gsutil Cloud API instance to use for retrieving object metadata. |
| 2699 |
| 2700 Returns: |
| 2701 components_to_upload: List of components that need to be uploaded. |
| 2702 uploaded_components: List of components that have already been |
| 2703 uploaded and are still valid. |
| 2704 existing_objects_to_delete: List of components that have already |
| 2705 been uploaded, but are no longer valid |
| 2706 and are in a versioned bucket, and |
| 2707 therefore should be deleted. |
| 2708 """ |
| 2709 components_to_upload = [] |
| 2710 existing_component_names = [component.object_name |
| 2711 for component in existing_components] |
| 2712 for component_name in dst_args: |
| 2713 if component_name not in existing_component_names: |
| 2714 components_to_upload.append(dst_args[component_name]) |
| 2715 |
| 2716 objects_already_chosen = [] |
| 2717 |
| 2718 # Don't reuse any temporary components whose MD5 doesn't match the current |
| 2719 # MD5 of the corresponding part of the file. If the bucket is versioned, |
| 2720 # also make sure that we delete the existing temporary version. |
| 2721 existing_objects_to_delete = [] |
| 2722 uploaded_components = [] |
| 2723 for tracker_object in existing_components: |
| 2724 if (tracker_object.object_name not in dst_args.keys() |
| 2725 or tracker_object.object_name in objects_already_chosen): |
| 2726 # This could happen if the component size has changed. This also serves |
| 2727 # to handle object names that get duplicated in the tracker file due |
| 2728 # to people doing things they shouldn't (e.g., overwriting an existing |
| 2729 # temporary component in a versioned bucket). |
| 2730 |
| 2731 url = bucket_url.Clone() |
| 2732 url.object_name = tracker_object.object_name |
| 2733 url.generation = tracker_object.generation |
| 2734 existing_objects_to_delete.append(url) |
| 2735 continue |
| 2736 |
| 2737 dst_arg = dst_args[tracker_object.object_name] |
| 2738 file_part = FilePart(dst_arg.filename, dst_arg.file_start, |
| 2739 dst_arg.file_length) |
| 2740 # TODO: calculate MD5's in parallel when possible. |
| 2741 content_md5 = CalculateB64EncodedMd5FromContents(file_part) |
| 2742 |
| 2743 try: |
| 2744 # Get the MD5 of the currently-existing component. |
| 2745 dst_url = dst_arg.dst_url |
| 2746 dst_metadata = gsutil_api.GetObjectMetadata( |
| 2747 dst_url.bucket_name, dst_url.object_name, |
| 2748 generation=dst_url.generation, provider=dst_url.scheme, |
| 2749 fields=['md5Hash', 'etag']) |
| 2750 cloud_md5 = dst_metadata.md5Hash |
| 2751 except Exception: # pylint: disable=broad-except |
| 2752 # We don't actually care what went wrong - we couldn't retrieve the |
| 2753 # object to check the MD5, so just upload it again. |
| 2754 cloud_md5 = None |
| 2755 |
| 2756 if cloud_md5 != content_md5: |
| 2757 components_to_upload.append(dst_arg) |
| 2758 objects_already_chosen.append(tracker_object.object_name) |
| 2759 if tracker_object.generation: |
| 2760 # If the old object doesn't have a generation (i.e., it isn't in a |
| 2761 # versioned bucket), then we will just overwrite it anyway. |
| 2762 invalid_component_with_generation = dst_arg.dst_url.Clone() |
| 2763 invalid_component_with_generation.generation = tracker_object.generation |
| 2764 existing_objects_to_delete.append(invalid_component_with_generation) |
| 2765 else: |
| 2766 url = dst_arg.dst_url.Clone() |
| 2767 url.generation = tracker_object.generation |
| 2768 uploaded_components.append(url) |
| 2769 objects_already_chosen.append(tracker_object.object_name) |
| 2770 |
| 2771 if uploaded_components: |
| 2772 logging.info('Found %d existing temporary components to reuse.', |
| 2773 len(uploaded_components)) |
| 2774 |
| 2775 return (components_to_upload, uploaded_components, |
| 2776 existing_objects_to_delete) |
| OLD | NEW |