| OLD | NEW |
| 1 # -*- coding: utf-8 -*- |
| 1 # Copyright 2011 Google Inc. All Rights Reserved. | 2 # Copyright 2011 Google Inc. All Rights Reserved. |
| 2 # Copyright 2011, Nexenta Systems Inc. | 3 # Copyright 2011, Nexenta Systems Inc. |
| 3 # | 4 # |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); | 5 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 # you may not use this file except in compliance with the License. | 6 # you may not use this file except in compliance with the License. |
| 6 # You may obtain a copy of the License at | 7 # You may obtain a copy of the License at |
| 7 # | 8 # |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 | 9 # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 # | 10 # |
| 10 # Unless required by applicable law or agreed to in writing, software | 11 # Unless required by applicable law or agreed to in writing, software |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, | 12 # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 # See the License for the specific language governing permissions and | 14 # See the License for the specific language governing permissions and |
| 14 # limitations under the License. | 15 # limitations under the License. |
| 16 """Implementation of Unix-like cp command for cloud storage providers.""" |
| 15 | 17 |
| 16 # Get the system logging module, not our local logging module. | |
| 17 from __future__ import absolute_import | 18 from __future__ import absolute_import |
| 18 | 19 |
| 19 import binascii | |
| 20 import boto | |
| 21 import copy | |
| 22 import crcmod | |
| 23 import csv | |
| 24 import datetime | |
| 25 import errno | |
| 26 import gslib | |
| 27 import gzip | |
| 28 import hashlib | |
| 29 import logging | |
| 30 import mimetypes | |
| 31 import mmap | |
| 32 import multiprocessing | |
| 33 import os | 20 import os |
| 34 import platform | |
| 35 import random | |
| 36 import re | |
| 37 import stat | |
| 38 import subprocess | |
| 39 import sys | |
| 40 import tempfile | |
| 41 import textwrap | |
| 42 import threading | |
| 43 import time | 21 import time |
| 44 import traceback | 22 import traceback |
| 45 | 23 |
| 46 from gslib.util import AddAcceptEncoding | 24 from gslib import copy_helper |
| 47 | 25 from gslib.cat_helper import CatHelper |
| 48 try: | 26 from gslib.cloud_api import AccessDeniedException |
| 49 from hashlib import md5 | 27 from gslib.cloud_api import NotFoundException |
| 50 except ImportError: | |
| 51 from md5 import md5 | |
| 52 | |
| 53 from boto import config | |
| 54 from boto.exception import GSResponseError | |
| 55 from boto.exception import ResumableUploadException | |
| 56 from boto.gs.resumable_upload_handler import ResumableUploadHandler | |
| 57 from boto.s3.keyfile import KeyFile | |
| 58 from boto.s3.resumable_download_handler import ResumableDownloadHandler | |
| 59 from boto.storage_uri import BucketStorageUri | |
| 60 from boto.storage_uri import StorageUri | |
| 61 from collections import namedtuple | |
| 62 from gslib.bucket_listing_ref import BucketListingRef | |
| 63 from gslib.command import COMMAND_NAME | |
| 64 from gslib.command import COMMAND_NAME_ALIASES | |
| 65 from gslib.command import Command | 28 from gslib.command import Command |
| 66 from gslib.command import FILE_URIS_OK | |
| 67 from gslib.command import MAX_ARGS | |
| 68 from gslib.command import MIN_ARGS | |
| 69 from gslib.command import PROVIDER_URIS_OK | |
| 70 from gslib.command import SUPPORTED_SUB_ARGS | |
| 71 from gslib.command import URIS_START_ARG | |
| 72 from gslib.commands.compose import MAX_COMPONENT_COUNT | 29 from gslib.commands.compose import MAX_COMPONENT_COUNT |
| 73 from gslib.commands.compose import MAX_COMPOSE_ARITY | 30 from gslib.copy_helper import CreateCopyHelperOpts |
| 74 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SI
ZE | 31 from gslib.copy_helper import ItemExistsError |
| 75 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD | 32 from gslib.copy_helper import Manifest |
| 33 from gslib.copy_helper import PARALLEL_UPLOAD_TEMP_NAMESPACE |
| 34 from gslib.cs_api_map import ApiSelector |
| 76 from gslib.exception import CommandException | 35 from gslib.exception import CommandException |
| 77 from gslib.file_part import FilePart | |
| 78 from gslib.help_provider import HELP_NAME | |
| 79 from gslib.help_provider import HELP_NAME_ALIASES | |
| 80 from gslib.help_provider import HELP_ONE_LINE_SUMMARY | |
| 81 from gslib.help_provider import HELP_TEXT | |
| 82 from gslib.help_provider import HelpType | |
| 83 from gslib.help_provider import HELP_TYPE | |
| 84 from gslib.name_expansion import NameExpansionIterator | 36 from gslib.name_expansion import NameExpansionIterator |
| 85 from gslib.util import BOTO_IS_SECURE | 37 from gslib.storage_url import ContainsWildcard |
| 86 from gslib.util import CreateLock | 38 from gslib.util import CreateLock |
| 87 from gslib.util import CreateTrackerDirIfNeeded | 39 from gslib.util import GetCloudApiInstance |
| 88 from gslib.util import GetConfigFilePath | 40 from gslib.util import IsCloudSubdirPlaceholder |
| 89 from gslib.util import ParseErrorDetail | |
| 90 from gslib.util import HumanReadableToBytes | |
| 91 from gslib.util import IS_WINDOWS | |
| 92 from gslib.util import MakeHumanReadable | 41 from gslib.util import MakeHumanReadable |
| 93 from gslib.util import NO_MAX | 42 from gslib.util import NO_MAX |
| 94 from gslib.util import TWO_MB | 43 from gslib.util import RemoveCRLFFromString |
| 95 from gslib.util import UsingCrcmodExtension | |
| 96 from gslib.wildcard_iterator import ContainsWildcard | |
| 97 from gslib.name_expansion import NameExpansionResult | |
| 98 | |
| 99 | |
| 100 SLOW_CRC_WARNING = """ | |
| 101 WARNING: Downloading this composite object requires integrity checking with | |
| 102 CRC32c, but your crcmod installation isn't using the module's C extension, so | |
| 103 the the hash computation will likely throttle download performance. For help | |
| 104 installing the extension, please see: | |
| 105 $ gsutil help crcmod | |
| 106 To disable slow integrity checking, see the "check_hashes" option in your boto | |
| 107 config file. | |
| 108 """ | |
| 109 | |
| 110 SLOW_CRC_EXCEPTION = CommandException( | |
| 111 """ | |
| 112 Downloading this composite object requires integrity checking with CRC32c, but | |
| 113 your crcmod installation isn't using the module's C extension, so the the hash | |
| 114 computation will likely throttle download performance. For help installing the | |
| 115 extension, please see: | |
| 116 $ gsutil help crcmod | |
| 117 To download regardless of crcmod performance or to skip slow integrity checks, | |
| 118 see the "check_hashes" option in your boto config file.""") | |
| 119 | |
| 120 NO_HASH_CHECK_WARNING = """ | |
| 121 WARNING: This download will not be validated since your crcmod installation | |
| 122 doesn't use the module's C extension, so the hash computation would likely | |
| 123 throttle download performance. For help in installing the extension, please see: | |
| 124 $ gsutil help crcmod | |
| 125 To force integrity checking, see the "check_hashes" option in your boto config | |
| 126 file. | |
| 127 """ | |
| 128 | |
| 129 NO_SERVER_HASH_EXCEPTION = CommandException( | |
| 130 """ | |
| 131 This object has no server-supplied hash for performing integrity | |
| 132 checks. To skip integrity checking for such objects, see the "check_hashes" | |
| 133 option in your boto config file.""") | |
| 134 | |
| 135 NO_SERVER_HASH_WARNING = """ | |
| 136 WARNING: This object has no server-supplied hash for performing integrity | |
| 137 checks. To force integrity checking, see the "check_hashes" option in your boto | |
| 138 config file. | |
| 139 """ | |
| 140 | |
| 141 PARALLEL_UPLOAD_TEMP_NAMESPACE = ( | |
| 142 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/') | |
| 143 | |
| 144 PARALLEL_UPLOAD_STATIC_SALT = u""" | |
| 145 PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS. | |
| 146 The theory is that no user will have prepended this to the front of | |
| 147 one of their object names and then done an MD5 hash of the name, and | |
| 148 then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object | |
| 149 name. Note that there will be no problems with object name length since we | |
| 150 hash the original name. | |
| 151 """ | |
| 152 | |
| 153 # In order to prevent people from uploading thousands of tiny files in parallel | |
| 154 # (which, apart from being useless, is likely to cause them to be throttled | |
| 155 # for the compose calls), don't allow files smaller than this to use parallel | |
| 156 # composite uploads. | |
| 157 MIN_PARALLEL_COMPOSITE_FILE_SIZE = 20971520 # 20 MB | |
| 158 | 44 |
| 159 SYNOPSIS_TEXT = """ | 45 SYNOPSIS_TEXT = """ |
| 160 <B>SYNOPSIS</B> | 46 <B>SYNOPSIS</B> |
| 161 gsutil cp [OPTION]... src_uri dst_uri | 47 gsutil cp [OPTION]... src_url dst_url |
| 162 gsutil cp [OPTION]... src_uri... dst_uri | 48 gsutil cp [OPTION]... src_url... dst_url |
| 163 gsutil cp [OPTION]... -I dst_uri | 49 gsutil cp [OPTION]... -I dst_url |
| 164 """ | 50 """ |
| 165 | 51 |
| 166 DESCRIPTION_TEXT = """ | 52 DESCRIPTION_TEXT = """ |
| 167 <B>DESCRIPTION</B> | 53 <B>DESCRIPTION</B> |
| 168 The gsutil cp command allows you to copy data between your local file | 54 The gsutil cp command allows you to copy data between your local file |
| 169 system and the cloud, copy data within the cloud, and copy data between | 55 system and the cloud, copy data within the cloud, and copy data between |
| 170 cloud storage providers. For example, to copy all text files from the | 56 cloud storage providers. For example, to copy all text files from the |
| 171 local directory to a bucket you could do: | 57 local directory to a bucket you could do: |
| 172 | 58 |
| 173 gsutil cp *.txt gs://my_bucket | 59 gsutil cp *.txt gs://my_bucket |
| 174 | 60 |
| 175 Similarly, you can download text files from a bucket by doing: | 61 Similarly, you can download text files from a bucket by doing: |
| 176 | 62 |
| 177 gsutil cp gs://my_bucket/*.txt . | 63 gsutil cp gs://my_bucket/*.txt . |
| 178 | 64 |
| 179 If you want to copy an entire directory tree you need to use the -R option: | 65 If you want to copy an entire directory tree you need to use the -R option: |
| 180 | 66 |
| 181 gsutil cp -R dir gs://my_bucket | 67 gsutil cp -R dir gs://my_bucket |
| 182 | 68 |
| 183 If you have a large number of files to upload you might want to use the | 69 If you have a large number of files to upload you might want to use the |
| 184 gsutil -m option, to perform a parallel (multi-threaded/multi-processing) | 70 gsutil -m option, to perform a parallel (multi-threaded/multi-processing) |
| 185 copy: | 71 copy: |
| 186 | 72 |
| 187 gsutil -m cp -R dir gs://my_bucket | 73 gsutil -m cp -R dir gs://my_bucket |
| 188 | 74 |
| 189 You can pass a list of URIs to copy on STDIN instead of as command line | 75 You can pass a list of URLs (one per line) to copy on STDIN instead of as |
| 190 arguments by using the -I option. This allows you to use gsutil in a | 76 command line arguments by using the -I option. This allows you to use gsutil |
| 191 pipeline to copy files and objects as generated by a program, such as: | 77 in a pipeline to upload or download files / objects as generated by a program, |
| 78 such as: |
| 192 | 79 |
| 193 some_program | gsutil -m cp -I gs://my_bucket | 80 some_program | gsutil -m cp -I gs://my_bucket |
| 194 | 81 |
| 195 The contents of STDIN can name files, cloud URIs, and wildcards of files | 82 or: |
| 196 and cloud URIs. | 83 |
| 84 some_program | gsutil -m cp -I ./download_dir |
| 85 |
| 86 The contents of STDIN can name files, cloud URLs, and wildcards of files |
| 87 and cloud URLs. |
| 197 """ | 88 """ |
| 198 | 89 |
| 199 NAME_CONSTRUCTION_TEXT = """ | 90 NAME_CONSTRUCTION_TEXT = """ |
| 200 <B>HOW NAMES ARE CONSTRUCTED</B> | 91 <B>HOW NAMES ARE CONSTRUCTED</B> |
| 201 The gsutil cp command strives to name objects in a way consistent with how | 92 The gsutil cp command strives to name objects in a way consistent with how |
| 202 Linux cp works, which causes names to be constructed in varying ways depending | 93 Linux cp works, which causes names to be constructed in varying ways depending |
| 203 on whether you're performing a recursive directory copy or copying | 94 on whether you're performing a recursive directory copy or copying |
| 204 individually named objects; and whether you're copying to an existing or | 95 individually named objects; and whether you're copying to an existing or |
| 205 non-existent directory. | 96 non-existent directory. |
| 206 | 97 |
| (...skipping 23 matching lines...) Expand all Loading... |
| 230 | 121 |
| 231 There's an additional wrinkle when working with subdirectories: the resulting | 122 There's an additional wrinkle when working with subdirectories: the resulting |
| 232 names depend on whether the destination subdirectory exists. For example, | 123 names depend on whether the destination subdirectory exists. For example, |
| 233 if gs://my_bucket/subdir exists as a subdirectory, the command: | 124 if gs://my_bucket/subdir exists as a subdirectory, the command: |
| 234 | 125 |
| 235 gsutil cp -R dir1/dir2 gs://my_bucket/subdir | 126 gsutil cp -R dir1/dir2 gs://my_bucket/subdir |
| 236 | 127 |
| 237 will create objects named like gs://my_bucket/subdir/dir2/a/b/c. In contrast, | 128 will create objects named like gs://my_bucket/subdir/dir2/a/b/c. In contrast, |
| 238 if gs://my_bucket/subdir does not exist, this same gsutil cp command will | 129 if gs://my_bucket/subdir does not exist, this same gsutil cp command will |
| 239 create objects named like gs://my_bucket/subdir/a/b/c. | 130 create objects named like gs://my_bucket/subdir/a/b/c. |
| 131 |
| 132 Note: If you use the |
| 133 `Google Developers Console <https://console.developers.google.com>`_ |
| 134 to create folders, it does so by creating a "placeholder" object that ends |
| 135 with a "/" character. gsutil skips these objects when downloading from the |
| 136 cloud to the local file system, because attempting to create a file that |
| 137 ends with a "/" is not allowed on Linux and MacOS. Because of this, it is |
| 138 recommended that you not create objects that end with "/" (unless you don't |
| 139 need to be able to download such objects using gsutil). |
| 240 """ | 140 """ |
| 241 | 141 |
| 242 SUBDIRECTORIES_TEXT = """ | 142 SUBDIRECTORIES_TEXT = """ |
| 243 <B>COPYING TO/FROM SUBDIRECTORIES; DISTRIBUTING TRANSFERS ACROSS MACHINES</B> | 143 <B>COPYING TO/FROM SUBDIRECTORIES; DISTRIBUTING TRANSFERS ACROSS MACHINES</B> |
| 244 You can use gsutil to copy to and from subdirectories by using a command | 144 You can use gsutil to copy to and from subdirectories by using a command |
| 245 like: | 145 like: |
| 246 | 146 |
| 247 gsutil cp -R dir gs://my_bucket/data | 147 gsutil cp -R dir gs://my_bucket/data |
| 248 | 148 |
| 249 This will cause dir and all of its files and nested subdirectories to be | 149 This will cause dir and all of its files and nested subdirectories to be |
| (...skipping 25 matching lines...) Expand all Loading... |
| 275 gsutil -m cp -R gs://my_bucket/data/result_set_[7-9]* dir | 175 gsutil -m cp -R gs://my_bucket/data/result_set_[7-9]* dir |
| 276 | 176 |
| 277 Note that dir could be a local directory on each machine, or it could | 177 Note that dir could be a local directory on each machine, or it could |
| 278 be a directory mounted off of a shared file server; whether the latter | 178 be a directory mounted off of a shared file server; whether the latter |
| 279 performs acceptably may depend on a number of things, so we recommend | 179 performs acceptably may depend on a number of things, so we recommend |
| 280 you experiment and find out what works best for you. | 180 you experiment and find out what works best for you. |
| 281 """ | 181 """ |
| 282 | 182 |
| 283 COPY_IN_CLOUD_TEXT = """ | 183 COPY_IN_CLOUD_TEXT = """ |
| 284 <B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B> | 184 <B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B> |
| 285 If both the source and destination URI are cloud URIs from the same | 185 If both the source and destination URL are cloud URLs from the same |
| 286 provider, gsutil copies data "in the cloud" (i.e., without downloading | 186 provider, gsutil copies data "in the cloud" (i.e., without downloading |
| 287 to and uploading from the machine where you run gsutil). In addition to | 187 to and uploading from the machine where you run gsutil). In addition to |
| 288 the performance and cost advantages of doing this, copying in the cloud | 188 the performance and cost advantages of doing this, copying in the cloud |
| 289 preserves metadata (like Content-Type and Cache-Control). In contrast, | 189 preserves metadata (like Content-Type and Cache-Control). In contrast, |
| 290 when you download data from the cloud it ends up in a file, which has | 190 when you download data from the cloud it ends up in a file, which has |
| 291 no associated metadata. Thus, unless you have some way to hold on to | 191 no associated metadata. Thus, unless you have some way to hold on to |
| 292 or re-create that metadata, downloading to a file will not retain the | 192 or re-create that metadata, downloading to a file will not retain the |
| 293 metadata. | 193 metadata. |
| 294 | 194 |
| 295 Note that by default, the gsutil cp command does not copy the object | 195 Note that by default, the gsutil cp command does not copy the object |
| 296 ACL to the new object, and instead will use the default bucket ACL (see | 196 ACL to the new object, and instead will use the default bucket ACL (see |
| 297 "gsutil help defacl"). You can override this behavior with the -p | 197 "gsutil help defacl"). You can override this behavior with the -p |
| 298 option (see OPTIONS below). | 198 option (see OPTIONS below). |
| 199 |
| 200 One additional note about copying in the cloud: If the destination bucket has |
| 201 versioning enabled, gsutil cp will copy all versions of the source object(s). |
| 202 For example: |
| 203 |
| 204 gsutil cp gs://bucket1/obj gs://bucket2 |
| 205 |
| 206 will cause all versions of gs://bucket1/obj to be copied to gs://bucket2. |
| 207 """ |
| 208 |
| 209 FAILURE_HANDLING_TEXT = """ |
| 210 <B>CHECKSUM VALIDATION AND FAILURE HANDLING</B> |
| 211 At the end of every upload or download, the gsutil cp command validates that |
| 212 that the checksum of the source file/object matches the checksum of the |
| 213 destination file/object. If the checksums do not match, gsutil will delete |
| 214 the invalid copy and print a warning message. This very rarely happens, but |
| 215 if it does, please contact gs-team@google.com. |
| 216 |
| 217 The cp command will retry when failures occur, but if enough failures happen |
| 218 during a particular copy or delete operation the command will skip that object |
| 219 and move on. At the end of the copy run if any failures were not successfully |
| 220 retried, the cp command will report the count of failures, and exit with |
| 221 non-zero status. |
| 222 |
| 223 Note that there are cases where retrying will never succeed, such as if you |
| 224 don't have write permission to the destination bucket or if the destination |
| 225 path for some objects is longer than the maximum allowed length. |
| 226 |
| 227 For more details about gsutil's retry handling, please see |
| 228 "gsutil help retries". |
| 299 """ | 229 """ |
| 300 | 230 |
| 301 RESUMABLE_TRANSFERS_TEXT = """ | 231 RESUMABLE_TRANSFERS_TEXT = """ |
| 302 <B>RESUMABLE TRANSFERS</B> | 232 <B>RESUMABLE TRANSFERS</B> |
| 303 gsutil automatically uses the Google Cloud Storage resumable upload | 233 gsutil automatically uses the Google Cloud Storage resumable upload feature |
| 304 feature whenever you use the cp command to upload an object that is larger | 234 whenever you use the cp command to upload an object that is larger than 2 |
| 305 than 2 MB. You do not need to specify any special command line options | 235 MB. You do not need to specify any special command line options to make this |
| 306 to make this happen. If your upload is interrupted you can restart the | 236 happen. If your upload is interrupted you can restart the upload by running |
| 307 upload by running the same cp command that you ran to start the upload. | 237 the same cp command that you ran to start the upload. Until the upload |
| 238 has completed successfully, it will not be visible at the destination object |
| 239 and will not replace any existing object the upload is intended to overwrite. |
| 240 (However, see the section on PARALLEL COMPOSITE UPLOADS, which may leave |
| 241 temporary component objects in place during the upload process.) |
| 308 | 242 |
| 309 Similarly, gsutil automatically performs resumable downloads (using HTTP | 243 Similarly, gsutil automatically performs resumable downloads (using HTTP |
| 310 standard Range GET operations) whenever you use the cp command to download an | 244 standard Range GET operations) whenever you use the cp command to download an |
| 311 object larger than 2 MB. | 245 object larger than 2 MB. In this case the partially downloaded file will be |
| 246 visible as soon as it starts being written. Thus, before you attempt to use |
| 247 any files downloaded by gsutil you should make sure the download completed |
| 248 successfully, by checking the exit status from the gsutil command. This can |
| 249 be done in a bash script, for example, by doing: |
| 250 |
| 251 gsutil cp gs://your-bucket/your-object ./local-file |
| 252 if [ "$status" -ne "0" ] ; then |
| 253 << Code that handles failures >> |
| 254 fi |
| 312 | 255 |
| 313 Resumable uploads and downloads store some state information in a file | 256 Resumable uploads and downloads store some state information in a file |
| 314 in ~/.gsutil named by the destination object or file. If you attempt to | 257 in ~/.gsutil named by the destination object or file. If you attempt to |
| 315 resume a transfer from a machine with a different directory, the transfer | 258 resume a transfer from a machine with a different directory, the transfer |
| 316 will start over from scratch. | 259 will start over from scratch. |
| 317 | 260 |
| 318 See also "gsutil help prod" for details on using resumable transfers | 261 See also "gsutil help prod" for details on using resumable transfers |
| 319 in production. | 262 in production. |
| 320 """ | 263 """ |
| 321 | 264 |
| 322 STREAMING_TRANSFERS_TEXT = """ | 265 STREAMING_TRANSFERS_TEXT = """ |
| 323 <B>STREAMING TRANSFERS</B> | 266 <B>STREAMING TRANSFERS</B> |
| 324 Use '-' in place of src_uri or dst_uri to perform a streaming | 267 Use '-' in place of src_url or dst_url to perform a streaming |
| 325 transfer. For example: | 268 transfer. For example: |
| 326 | 269 |
| 327 long_running_computation | gsutil cp - gs://my_bucket/obj | 270 long_running_computation | gsutil cp - gs://my_bucket/obj |
| 328 | 271 |
| 329 Streaming transfers do not support resumable uploads/downloads. | 272 Streaming transfers do not support resumable uploads/downloads. |
| 330 (The Google resumable transfer protocol has a way to support streaming | 273 (The Google resumable transfer protocol has a way to support streaming |
| 331 transfers, but gsutil doesn't currently implement support for this.) | 274 transfers, but gsutil doesn't currently implement support for this.) |
| 332 """ | 275 """ |
| 333 | 276 |
| 334 PARALLEL_COMPOSITE_UPLOADS_TEXT = """ | 277 PARALLEL_COMPOSITE_UPLOADS_TEXT = """ |
| 335 <B>PARALLEL COMPOSITE UPLOADS</B> | 278 <B>PARALLEL COMPOSITE UPLOADS</B> |
| 336 gsutil automatically uses `object composition <https://developers.google.com/s
torage/docs/composite-objects>`_ | 279 gsutil can automatically use |
| 337 to perform uploads in parallel for large, local files being uploaded to | 280 `object composition <https://developers.google.com/storage/docs/composite-obje
cts>`_ |
| 338 Google Cloud Storage. This means that, by default, a large file will be split | 281 to perform uploads in parallel for large, local files being uploaded to Google |
| 339 into component pieces that will be uploaded in parallel. Those components will | 282 Cloud Storage. This means that, if enabled (see next paragraph), a large file |
| 340 then be composed in the cloud, and the temporary components in the cloud will | 283 will be split into component pieces that will be uploaded in parallel. Those |
| 341 be deleted after successful composition. No additional local disk space is | 284 components will then be composed in the cloud, and the temporary components in |
| 342 required for this operation. | 285 the cloud will be deleted after successful composition. No additional local |
| 286 disk space is required for this operation. |
| 343 | 287 |
| 344 Any file whose size exceeds the "parallel_composite_upload_threshold" config | 288 If the "parallel_composite_upload_threshold" config value is not 0 (which |
| 345 variable will trigger this feature by default. The ideal size of a | 289 disbles the feature), any file whose size exceeds the specified size will |
| 346 component can also be set with the "parallel_composite_upload_component_size" | 290 trigger a parallel composite upload. Note that at present parallel composite |
| 347 config variable. See the .boto config file for details about how these values | 291 uploads are disabled by default, because using composite objects requires a |
| 348 are used. | 292 compiled crcmod (see "gsutil help crcmod"), and for operating systems that |
| 293 don't already have this package installed this makes gsutil harder to use. |
| 294 Google is actively working with a number of the Linux distributions to get |
| 295 crcmod included with the stock distribution. Once that is done we will |
| 296 re-enable parallel composite uploads by default in gsutil. |
| 297 |
| 298 The ideal size of a component can also be set with the |
| 299 "parallel_composite_upload_component_size" config variable. See the comments |
| 300 in the .boto config file for details about how these values are used. |
| 349 | 301 |
| 350 If the transfer fails prior to composition, running the command again will | 302 If the transfer fails prior to composition, running the command again will |
| 351 take advantage of resumable uploads for those components that failed, and | 303 take advantage of resumable uploads for those components that failed, and |
| 352 the component objects will be deleted after the first successful attempt. | 304 the component objects will be deleted after the first successful attempt. |
| 353 Any temporary objects that were uploaded successfully before gsutil failed | 305 Any temporary objects that were uploaded successfully before gsutil failed |
| 354 will still exist until the upload is completed successfully. The temporary | 306 will still exist until the upload is completed successfully. The temporary |
| 355 objects will be named in the following fashion: | 307 objects will be named in the following fashion: |
| 356 <random ID>%s<hash> | 308 <random ID>%s<hash> |
| 357 where <random ID> is some numerical value, and <hash> is an MD5 hash (not | 309 where <random ID> is some numerical value, and <hash> is an MD5 hash (not |
| 358 related to the hash of the contents of the file or object). | 310 related to the hash of the contents of the file or object). |
| 359 | 311 |
| 312 To avoid leaving temporary objects around, you should make sure to check the |
| 313 exit status from the gsutil command. This can be done in a bash script, for |
| 314 example, by doing: |
| 315 |
| 316 gsutil cp ./local-file gs://your-bucket/your-object |
| 317 if [ "$status" -ne "0" ] ; then |
| 318 << Code that handles failures >> |
| 319 fi |
| 320 |
| 321 Or, for copying a directory, use this instead: |
| 322 |
| 323 gsutil cp -c -L cp.log -R ./dir gs://bucket |
| 324 if [ "$status" -ne "0" ] ; then |
| 325 << Code that handles failures >> |
| 326 fi |
| 327 |
| 360 One important caveat is that files uploaded in this fashion are still subject | 328 One important caveat is that files uploaded in this fashion are still subject |
| 361 to the maximum number of components limit. For example, if you upload a large | 329 to the maximum number of components limit. For example, if you upload a large |
| 362 file that gets split into %d components, and try to compose it with another | 330 file that gets split into %d components, and try to compose it with another |
| 363 object with %d components, the operation will fail because it exceeds the %d | 331 object with %d components, the operation will fail because it exceeds the %d |
| 364 component limit. If you wish to compose an object later and the component | 332 component limit. If you wish to compose an object later and the component |
| 365 limit is a concern, it is recommended that you disable parallel composite | 333 limit is a concern, it is recommended that you disable parallel composite |
| 366 uploads for that transfer. | 334 uploads for that transfer. |
| 367 | 335 |
| 368 Also note that an object uploaded using this feature will have a CRC32C hash, | 336 Also note that an object uploaded using this feature will have a CRC32C hash, |
| 369 but it will not have an MD5 hash. For details see 'gsutil help crc32c'. | 337 but it will not have an MD5 hash. For details see 'gsutil help crc32c'. |
| 370 | 338 |
| 371 Note that this feature can be completely disabled by setting the | 339 Note that this feature can be completely disabled by setting the |
| 372 "parallel_composite_upload_threshold" variable in the .boto config file to 0. | 340 "parallel_composite_upload_threshold" variable in the .boto config file to 0. |
| 373 """ % (PARALLEL_UPLOAD_TEMP_NAMESPACE, 10, MAX_COMPONENT_COUNT - 9, | 341 """ % (PARALLEL_UPLOAD_TEMP_NAMESPACE, 10, MAX_COMPONENT_COUNT - 9, |
| 374 MAX_COMPONENT_COUNT) | 342 MAX_COMPONENT_COUNT) |
| 375 | 343 |
| 344 |
| 376 CHANGING_TEMP_DIRECTORIES_TEXT = """ | 345 CHANGING_TEMP_DIRECTORIES_TEXT = """ |
| 377 <B>CHANGING TEMP DIRECTORIES</B> | 346 <B>CHANGING TEMP DIRECTORIES</B> |
| 378 gsutil writes data to a temporary directory in several cases: | 347 gsutil writes data to a temporary directory in several cases: |
| 379 | 348 |
| 380 - when compressing data to be uploaded (see the -z option) | 349 - when compressing data to be uploaded (see the -z option) |
| 381 - when decompressing data being downloaded (when the data has | 350 - when decompressing data being downloaded (when the data has |
| 382 Content-Encoding:gzip, e.g., as happens when uploaded using gsutil cp -z) | 351 Content-Encoding:gzip, e.g., as happens when uploaded using gsutil cp -z) |
| 383 - when running integration tests (using the gsutil test command) | 352 - when running integration tests (using the gsutil test command) |
| 384 | 353 |
| 385 In these cases it's possible the temp file location on your system that | 354 In these cases it's possible the temp file location on your system that |
| (...skipping 12 matching lines...) Expand all Loading... |
| 398 export TMPDIR=/some/directory | 367 export TMPDIR=/some/directory |
| 399 | 368 |
| 400 On Windows 7 you can change the TMPDIR environment variable from Start -> | 369 On Windows 7 you can change the TMPDIR environment variable from Start -> |
| 401 Computer -> System -> Advanced System Settings -> Environment Variables. | 370 Computer -> System -> Advanced System Settings -> Environment Variables. |
| 402 You need to reboot after making this change for it to take effect. (Rebooting | 371 You need to reboot after making this change for it to take effect. (Rebooting |
| 403 is not necessary after running the export command on Linux and MacOS.) | 372 is not necessary after running the export command on Linux and MacOS.) |
| 404 """ | 373 """ |
| 405 | 374 |
| 406 OPTIONS_TEXT = """ | 375 OPTIONS_TEXT = """ |
| 407 <B>OPTIONS</B> | 376 <B>OPTIONS</B> |
| 408 -a canned_acl Sets named canned_acl when uploaded objects created. See | 377 -a canned_acl Sets named canned_acl when uploaded objects created. See |
| 409 'gsutil help acls' for further details. | 378 'gsutil help acls' for further details. |
| 410 | 379 |
| 411 -c If an error occurrs, continue to attempt to copy the remaining | 380 -c If an error occurrs, continue to attempt to copy the remaining |
| 412 files. Note that this option is always true when running | 381 files. If any copies were unsuccessful, gsutil's exit status |
| 413 "gsutil -m cp". | 382 will be non-zero even if this flag is set. This option is |
| 383 implicitly set when running "gsutil -m cp...". Note: -c only |
| 384 applies to the actual copying operation. If an error occurs |
| 385 while iterating over the files in the local directory (e.g., |
| 386 invalid Unicode file name) gsutil will print an error message |
| 387 and abort. |
| 414 | 388 |
| 415 -D Copy in "daisy chain" mode, i.e., copying between two buckets by | 389 -D Copy in "daisy chain" mode, i.e., copying between two buckets |
| 416 hooking a download to an upload, via the machine where gsutil is | 390 by hooking a download to an upload, via the machine where |
| 417 run. By default, data are copied between two buckets | 391 gsutil is run. By default, data are copied between two buckets |
| 418 "in the cloud", i.e., without needing to copy via the machine | 392 "in the cloud", i.e., without needing to copy via the machine |
| 419 where gsutil runs. | 393 where gsutil runs. |
| 420 | 394 |
| 421 By default, a "copy in the cloud" when the source is a composite | 395 By default, a "copy in the cloud" when the source is a |
| 422 object will retain the composite nature of the object. However, | 396 composite object will retain the composite nature of the |
| 423 Daisy chain mode can be used to change a composite object into | 397 object. However, Daisy chain mode can be used to change a |
| 424 a non-composite object. For example: | 398 composite object into a non-composite object. For example: |
| 425 | 399 |
| 426 gsutil cp -D -p gs://bucket/obj gs://bucket/obj_tmp | 400 gsutil cp -D -p gs://bucket/obj gs://bucket/obj_tmp |
| 427 gsutil mv -p gs://bucket/obj_tmp gs://bucket/obj | 401 gsutil mv -p gs://bucket/obj_tmp gs://bucket/obj |
| 428 | 402 |
| 429 Note: Daisy chain mode is automatically used when copying | 403 Note: Daisy chain mode is automatically used when copying |
| 430 between providers (e.g., to copy data from Google Cloud Storage | 404 between providers (e.g., to copy data from Google Cloud Storage |
| 431 to another provider). | 405 to another provider). |
| 432 | 406 |
| 433 -e Exclude symlinks. When specified, symbolic links will not be | 407 -e Exclude symlinks. When specified, symbolic links will not be |
| 434 copied. | 408 copied. |
| 435 | 409 |
| 436 -L <file> Outputs a manifest log file with detailed information about each | 410 -I Causes gsutil to read the list of files or objects to copy from |
| 437 item that was copied. This manifest contains the following | 411 stdin. This allows you to run a program that generates the list |
| 438 information for each item: | 412 of files to upload/download. |
| 439 | 413 |
| 440 - Source path. | 414 -L <file> Outputs a manifest log file with detailed information about |
| 441 - Destination path. | 415 each item that was copied. This manifest contains the following |
| 442 - Source size. | 416 information for each item: |
| 443 - Bytes transferred. | |
| 444 - MD5 hash. | |
| 445 - UTC date and time transfer was started in ISO 8601 format. | |
| 446 - UTC date and time transfer was completed in ISO 8601 format. | |
| 447 - Upload id, if a resumable upload was performed. | |
| 448 - Final result of the attempted transfer, success or failure. | |
| 449 - Failure details, if any. | |
| 450 | 417 |
| 451 If the log file already exists, gsutil will use the file as an | 418 - Source path. |
| 452 input to the copy process, and will also append log items to the | 419 - Destination path. |
| 453 existing file. Files/objects that are marked in the existing log | 420 - Source size. |
| 454 file as having been successfully copied (or skipped) will be | 421 - Bytes transferred. |
| 455 ignored. Files/objects without entries will be copied and ones | 422 - MD5 hash. |
| 456 previously marked as unsuccessful will be retried. This can be | 423 - UTC date and time transfer was started in ISO 8601 format. |
| 457 used in conjunction with the -c option to build a script that | 424 - UTC date and time transfer was completed in ISO 8601 format. |
| 458 copies a large number of objects reliably, using a bash script | 425 - Upload id, if a resumable upload was performed. |
| 459 like the following: | 426 - Final result of the attempted transfer, success or failure. |
| 427 - Failure details, if any. |
| 460 | 428 |
| 461 status=1 | 429 If the log file already exists, gsutil will use the file as an |
| 462 while [ $status -ne 0 ] ; do | 430 input to the copy process, and will also append log items to |
| 463 gsutil cp -c -L cp.log -R ./dir gs://bucket | 431 the existing file. Files/objects that are marked in the |
| 464 status=$? | 432 existing log file as having been successfully copied (or |
| 465 done | 433 skipped) will be ignored. Files/objects without entries will be |
| 434 copied and ones previously marked as unsuccessful will be |
| 435 retried. This can be used in conjunction with the -c option to |
| 436 build a script that copies a large number of objects reliably, |
| 437 using a bash script like the following: |
| 466 | 438 |
| 467 The -c option will cause copying to continue after failures | 439 until gsutil cp -c -L cp.log -R ./dir gs://bucket; do |
| 468 occur, and the -L option will allow gsutil to pick up where it | 440 sleep 1 |
| 469 left off without duplicating work. The loop will continue | 441 done |
| 470 running as long as gsutil exits with a non-zero status (such a | |
| 471 status indicates there was at least one failure during the | |
| 472 gsutil run). | |
| 473 | 442 |
| 474 -n No-clobber. When specified, existing files or objects at the | 443 The -c option will cause copying to continue after failures |
| 475 destination will not be overwritten. Any items that are skipped | 444 occur, and the -L option will allow gsutil to pick up where it |
| 476 by this option will be reported as being skipped. This option | 445 left off without duplicating work. The loop will continue |
| 477 will perform an additional HEAD request to check if an item | 446 running as long as gsutil exits with a non-zero status (such a |
| 478 exists before attempting to upload the data. This will save | 447 status indicates there was at least one failure during the |
| 479 retransmitting data, but the additional HTTP requests may make | 448 gsutil run). |
| 480 small object transfers slower and more expensive. | |
| 481 | 449 |
| 482 -p Causes source ACLs to be preserved when copying in the cloud. | 450 Note: If you're trying to synchronize the contents of a |
| 483 Note that this option has performance and cost implications, | 451 directory and a bucket (or two buckets), see |
| 484 because it is essentially performing three requests ('acl get', | 452 'gsutil help rsync'. |
| 485 cp, 'acl set'). (The performance issue can be mitigated to some | |
| 486 degree by using gsutil -m cp to cause parallel copying.) | |
| 487 | 453 |
| 488 You can avoid the additional performance and cost of using cp -p | 454 -n No-clobber. When specified, existing files or objects at the |
| 489 if you want all objects in the destination bucket to end up with | 455 destination will not be overwritten. Any items that are skipped |
| 490 the same ACL by setting a default ACL on that bucket instead of | 456 by this option will be reported as being skipped. This option |
| 491 using cp -p. See "help gsutil defacl". | 457 will perform an additional GET request to check if an item |
| 458 exists before attempting to upload the data. This will save |
| 459 retransmitting data, but the additional HTTP requests may make |
| 460 small object transfers slower and more expensive. |
| 492 | 461 |
| 493 Note that it's not valid to specify both the -a and -p options | 462 -p Causes ACLs to be preserved when copying in the cloud. Note |
| 494 together. | 463 that this option has performance and cost implications when |
| 464 using the XML API, as it requires separate HTTP calls for |
| 465 interacting with ACLs. The performance issue can be mitigated |
| 466 to some degree by using gsutil -m cp to cause parallel copying. |
| 467 Also, this option only works if you have OWNER access to all of |
| 468 the objects that are copied. |
| 495 | 469 |
| 496 -q Deprecated. Please use gsutil -q cp ... instead. | 470 You can avoid the additional performance and cost of using |
| 471 cp -p if you want all objects in the destination bucket to end |
| 472 up with the same ACL by setting a default object ACL on that |
| 473 bucket instead of using cp -p. See "help gsutil defacl". |
| 497 | 474 |
| 498 -R, -r Causes directories, buckets, and bucket subdirectories to be | 475 Note that it's not valid to specify both the -a and -p options |
| 499 copied recursively. If you neglect to use this option for | 476 together. |
| 500 an upload, gsutil will copy any files it finds and skip any | |
| 501 directories. Similarly, neglecting to specify -R for a download | |
| 502 will cause gsutil to copy any objects at the current bucket | |
| 503 directory level, and skip any subdirectories. | |
| 504 | 477 |
| 505 -v Requests that the version-specific URI for each uploaded object | 478 -R, -r Causes directories, buckets, and bucket subdirectories to be |
| 506 be printed. Given this URI you can make future upload requests | 479 copied recursively. If you neglect to use this option for |
| 507 that are safe in the face of concurrent updates, because Google | 480 an upload, gsutil will copy any files it finds and skip any |
| 508 Cloud Storage will refuse to perform the update if the current | 481 directories. Similarly, neglecting to specify -R for a download |
| 509 object version doesn't match the version-specific URI. See | 482 will cause gsutil to copy any objects at the current bucket |
| 510 'gsutil help versions' for more details. | 483 directory level, and skip any subdirectories. |
| 511 | 484 |
| 512 -z <ext,...> Applies gzip content-encoding to file uploads with the given | 485 -v Requests that the version-specific URL for each uploaded object |
| 513 extensions. This is useful when uploading files with | 486 be printed. Given this URL you can make future upload requests |
| 514 compressible content (such as .js, .css, or .html files) because | 487 that are safe in the face of concurrent updates, because Google |
| 515 it saves network bandwidth and space in Google Cloud Storage, | 488 Cloud Storage will refuse to perform the update if the current |
| 516 which in turn reduces storage costs. | 489 object version doesn't match the version-specific URL. See |
| 490 'gsutil help versions' for more details. |
| 517 | 491 |
| 518 When you specify the -z option, the data from your files is | 492 -z <ext,...> Applies gzip content-encoding to file uploads with the given |
| 519 compressed before it is uploaded, but your actual files are left | 493 extensions. This is useful when uploading files with |
| 520 uncompressed on the local disk. The uploaded objects retain the | 494 compressible content (such as .js, .css, or .html files) |
| 521 Content-Type and name of the original files but are given a | 495 because it saves network bandwidth and space in Google Cloud |
| 522 Content-Encoding header with the value "gzip" to indicate that | 496 Storage, which in turn reduces storage costs. |
| 523 the object data stored are compressed on the Google Cloud | |
| 524 Storage servers. | |
| 525 | 497 |
| 526 For example, the following command: | 498 When you specify the -z option, the data from your files is |
| 499 compressed before it is uploaded, but your actual files are |
| 500 left uncompressed on the local disk. The uploaded objects |
| 501 retain the Content-Type and name of the original files but are |
| 502 given a Content-Encoding header with the value "gzip" to |
| 503 indicate that the object data stored are compressed on the |
| 504 Google Cloud Storage servers. |
| 527 | 505 |
| 528 gsutil cp -z html -a public-read cattypes.html gs://mycats | 506 For example, the following command: |
| 529 | 507 |
| 530 will do all of the following: | 508 gsutil cp -z html -a public-read cattypes.html gs://mycats |
| 531 | 509 |
| 532 - Upload as the object gs://mycats/cattypes.html (cp command) | 510 will do all of the following: |
| 533 - Set the Content-Type to text/html (based on file extension) | 511 |
| 534 - Compress the data in the file cattypes.html (-z option) | 512 - Upload as the object gs://mycats/cattypes.html (cp command) |
| 535 - Set the Content-Encoding to gzip (-z option) | 513 - Set the Content-Type to text/html (based on file extension) |
| 536 - Set the ACL to public-read (-a option) | 514 - Compress the data in the file cattypes.html (-z option) |
| 537 - If a user tries to view cattypes.html in a browser, the | 515 - Set the Content-Encoding to gzip (-z option) |
| 538 browser will know to uncompress the data based on the | 516 - Set the ACL to public-read (-a option) |
| 539 Content-Encoding header, and to render it as HTML based on | 517 - If a user tries to view cattypes.html in a browser, the |
| 540 the Content-Type header. | 518 browser will know to uncompress the data based on the |
| 519 Content-Encoding header, and to render it as HTML based on |
| 520 the Content-Type header. |
| 521 |
| 522 Note that if you download an object with Content-Encoding:gzip |
| 523 gsutil will decompress the content before writing the local |
| 524 file. |
| 541 """ | 525 """ |
| 542 | 526 |
| 543 _detailed_help_text = '\n\n'.join([SYNOPSIS_TEXT, | 527 _DETAILED_HELP_TEXT = '\n\n'.join([SYNOPSIS_TEXT, |
| 544 DESCRIPTION_TEXT, | 528 DESCRIPTION_TEXT, |
| 545 NAME_CONSTRUCTION_TEXT, | 529 NAME_CONSTRUCTION_TEXT, |
| 546 SUBDIRECTORIES_TEXT, | 530 SUBDIRECTORIES_TEXT, |
| 547 COPY_IN_CLOUD_TEXT, | 531 COPY_IN_CLOUD_TEXT, |
| 532 FAILURE_HANDLING_TEXT, |
| 548 RESUMABLE_TRANSFERS_TEXT, | 533 RESUMABLE_TRANSFERS_TEXT, |
| 549 STREAMING_TRANSFERS_TEXT, | 534 STREAMING_TRANSFERS_TEXT, |
| 550 PARALLEL_COMPOSITE_UPLOADS_TEXT, | 535 PARALLEL_COMPOSITE_UPLOADS_TEXT, |
| 551 CHANGING_TEMP_DIRECTORIES_TEXT, | 536 CHANGING_TEMP_DIRECTORIES_TEXT, |
| 552 OPTIONS_TEXT]) | 537 OPTIONS_TEXT]) |
| 553 | 538 |
| 554 # This tuple is used only to encapsulate the arguments needed for | |
| 555 # _PerformResumableUploadIfApplies, so that the arguments fit the model of | |
| 556 # command.Apply(). | |
| 557 PerformResumableUploadIfAppliesArgs = namedtuple( | |
| 558 'PerformResumableUploadIfAppliesArgs', | |
| 559 'filename file_start file_length src_uri dst_uri canned_acl headers ' | |
| 560 'tracker_file tracker_file_lock') | |
| 561 | 539 |
| 562 ObjectFromTracker = namedtuple('ObjectFromTracker', | 540 CP_SUB_ARGS = 'a:cDeIL:MNnprRtvz:' |
| 563 'object_name generation') | |
| 564 | |
| 565 CP_SUB_ARGS = 'a:cDeIL:MNnpqrRtvz:' | |
| 566 | |
| 567 # The maximum length of a file name can vary wildly between different | |
| 568 # operating systems, so we always ensure that tracker files are less | |
| 569 # than 100 characters in order to avoid any such issues. | |
| 570 MAX_TRACKER_FILE_NAME_LENGTH = 100 | |
| 571 | 541 |
| 572 | 542 |
| 573 class TrackerFileType(object): | 543 def _CopyFuncWrapper(cls, args, thread_state=None): |
| 574 UPLOAD = 1 | 544 cls.CopyFunc(args, thread_state=thread_state) |
| 575 DOWNLOAD = 2 | |
| 576 PARALLEL_UPLOAD = 3 | |
| 577 | 545 |
| 578 def _CopyFuncWrapper(cls, args): | |
| 579 cls._CopyFunc(args) | |
| 580 | |
| 581 def _PerformResumableUploadIfAppliesWrapper(cls, args): | |
| 582 """A wrapper for cp._PerformResumableUploadIfApplies, which takes in a | |
| 583 PerformResumableUploadIfAppliesArgs, extracts the arguments to form the | |
| 584 arguments for the wrapped function, and then calls the wrapped function. | |
| 585 This was designed specifically for use with command.Apply(). | |
| 586 """ | |
| 587 fp = FilePart(args.filename, args.file_start, args.file_length) | |
| 588 with fp: | |
| 589 already_split = True | |
| 590 ret = cls._PerformResumableUploadIfApplies( | |
| 591 fp, args.src_uri, args.dst_uri, args.canned_acl, args.headers, | |
| 592 fp.length, already_split) | |
| 593 | |
| 594 # Update the tracker file after each call in order to be as robust as possible | |
| 595 # against interrupts, failures, etc. | |
| 596 component = ret[2] | |
| 597 _AppendComponentTrackerToParallelUploadTrackerFile(args.tracker_file, | |
| 598 component, | |
| 599 args.tracker_file_lock) | |
| 600 return ret | |
| 601 | 546 |
| 602 def _CopyExceptionHandler(cls, e): | 547 def _CopyExceptionHandler(cls, e): |
| 603 """Simple exception handler to allow post-completion status.""" | 548 """Simple exception handler to allow post-completion status.""" |
| 604 cls.logger.error(str(e)) | 549 cls.logger.error(str(e)) |
| 605 cls.copy_failure_count += 1 | 550 cls.op_failure_count += 1 |
| 606 cls.logger.debug(('\n\nEncountered exception while copying:\n%s\n' % | 551 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n', |
| 607 traceback.format_exc())) | 552 traceback.format_exc()) |
| 553 |
| 608 | 554 |
| 609 def _RmExceptionHandler(cls, e): | 555 def _RmExceptionHandler(cls, e): |
| 610 """Simple exception handler to allow post-completion status.""" | 556 """Simple exception handler to allow post-completion status.""" |
| 611 cls.logger.error(str(e)) | 557 cls.logger.error(str(e)) |
| 612 | 558 |
| 559 |
| 613 class CpCommand(Command): | 560 class CpCommand(Command): |
| 614 """ | 561 """Implementation of gsutil cp command. |
| 615 Implementation of gsutil cp command. | |
| 616 | 562 |
| 617 Note that CpCommand is run for both gsutil cp and gsutil mv. The latter | 563 Note that CpCommand is run for both gsutil cp and gsutil mv. The latter |
| 618 happens by MvCommand calling CpCommand and passing the hidden (undocumented) | 564 happens by MvCommand calling CpCommand and passing the hidden (undocumented) |
| 619 -M option. This allows the copy and remove needed for each mv to run | 565 -M option. This allows the copy and remove needed for each mv to run |
| 620 together (rather than first running all the cp's and then all the rm's, as | 566 together (rather than first running all the cp's and then all the rm's, as |
| 621 we originally had implemented), which in turn avoids the following problem | 567 we originally had implemented), which in turn avoids the following problem |
| 622 with removing the wrong objects: starting with a bucket containing only | 568 with removing the wrong objects: starting with a bucket containing only |
| 623 the object gs://bucket/obj, say the user does: | 569 the object gs://bucket/obj, say the user does: |
| 624 gsutil mv gs://bucket/* gs://bucket/d.txt | 570 gsutil mv gs://bucket/* gs://bucket/d.txt |
| 625 If we ran all the cp's and then all the rm's and we didn't expand the wildcard | 571 If we ran all the cp's and then all the rm's and we didn't expand the wildcard |
| 626 first, the cp command would first copy gs://bucket/obj to gs://bucket/d.txt, | 572 first, the cp command would first copy gs://bucket/obj to gs://bucket/d.txt, |
| 627 and the rm command would then remove that object. In the implementation | 573 and the rm command would then remove that object. In the implementation |
| 628 prior to gsutil release 3.12 we avoided this by building a list of objects | 574 prior to gsutil release 3.12 we avoided this by building a list of objects |
| 629 to process and then running the copies and then the removes; but building | 575 to process and then running the copies and then the removes; but building |
| 630 the list up front limits scalability (compared with the current approach | 576 the list up front limits scalability (compared with the current approach |
| 631 of processing the bucket listing iterator on the fly). | 577 of processing the bucket listing iterator on the fly). |
| 632 """ | 578 """ |
| 633 | 579 |
| 634 # TODO: Refactor this file to be less cumbersome. In particular, some of the | 580 # Command specification. See base class for documentation. |
| 635 # different paths (e.g., uploading a file to an object vs. downloading an | 581 command_spec = Command.CreateCommandSpec( |
| 636 # object to a file) could be split into separate files. | 582 'cp', |
| 637 | 583 command_name_aliases=['copy'], |
| 638 # Set default Content-Type type. | 584 min_args=1, |
| 639 DEFAULT_CONTENT_TYPE = 'application/octet-stream' | 585 max_args=NO_MAX, |
| 640 USE_MAGICFILE = boto.config.getbool('GSUtil', 'use_magicfile', False) | 586 # -t is deprecated but leave intact for now to avoid breakage. |
| 641 # Chunk size to use while unzipping gzip files. | 587 supported_sub_args=CP_SUB_ARGS, |
| 642 GUNZIP_CHUNK_SIZE = 8192 | 588 file_url_ok=True, |
| 643 | 589 provider_url_ok=False, |
| 644 # Command specification (processed by parent class). | 590 urls_start_arg=0, |
| 645 command_spec = { | 591 gs_api_support=[ApiSelector.XML, ApiSelector.JSON], |
| 646 # Name of command. | 592 gs_default_api=ApiSelector.JSON, |
| 647 COMMAND_NAME : 'cp', | 593 supported_private_args=['haltatbyte='], |
| 648 # List of command name aliases. | 594 ) |
| 649 COMMAND_NAME_ALIASES : ['copy'], | 595 # Help specification. See help_provider.py for documentation. |
| 650 # Min number of args required by this command. | 596 help_spec = Command.HelpSpec( |
| 651 MIN_ARGS : 1, | 597 help_name='cp', |
| 652 # Max number of args required by this command, or NO_MAX. | 598 help_name_aliases=['copy'], |
| 653 MAX_ARGS : NO_MAX, | 599 help_type='command_help', |
| 654 # Getopt-style string specifying acceptable sub args. | 600 help_one_line_summary='Copy files and objects', |
| 655 # -t is deprecated but leave intact for now to avoid breakage. | 601 help_text=_DETAILED_HELP_TEXT, |
| 656 SUPPORTED_SUB_ARGS : CP_SUB_ARGS, | 602 subcommand_help_text={}, |
| 657 # True if file URIs acceptable for this command. | 603 ) |
| 658 FILE_URIS_OK : True, | 604 |
| 659 # True if provider-only URIs acceptable for this command. | 605 # pylint: disable=too-many-statements |
| 660 PROVIDER_URIS_OK : False, | 606 def CopyFunc(self, name_expansion_result, thread_state=None): |
| 661 # Index in args of first URI arg. | |
| 662 URIS_START_ARG : 0, | |
| 663 } | |
| 664 help_spec = { | |
| 665 # Name of command or auxiliary help info for which this help applies. | |
| 666 HELP_NAME : 'cp', | |
| 667 # List of help name aliases. | |
| 668 HELP_NAME_ALIASES : ['copy'], | |
| 669 # Type of help: | |
| 670 HELP_TYPE : HelpType.COMMAND_HELP, | |
| 671 # One line summary of this help. | |
| 672 HELP_ONE_LINE_SUMMARY : 'Copy files and objects', | |
| 673 # The full help text. | |
| 674 HELP_TEXT : _detailed_help_text, | |
| 675 } | |
| 676 | |
| 677 def _GetMD5FromETag(self, key): | |
| 678 if not key.etag: | |
| 679 return None | |
| 680 possible_md5 = key.etag.strip('"\'').lower() | |
| 681 if re.match(r'^[0-9a-f]{32}$', possible_md5): | |
| 682 return binascii.a2b_hex(possible_md5) | |
| 683 | |
| 684 def _CheckHashes(self, key, file_name, hash_algs_to_compute, | |
| 685 computed_hashes=None): | |
| 686 """Validates integrity by comparing cloud digest to local digest. | |
| 687 | |
| 688 Args: | |
| 689 key: Instance of boto Key object. | |
| 690 file_name: Name of downloaded file on local disk. | |
| 691 hash_algs_to_compute: Dictionary mapping hash algorithm names to digester | |
| 692 objects. | |
| 693 computed_hashes: If specified, use this dictionary mapping hash algorithm | |
| 694 names to the calculated digest. If not specified, the | |
| 695 key argument will be checked for local_digests property. | |
| 696 If neither exist, the local file will be opened and | |
| 697 digests calculated on-demand. | |
| 698 | |
| 699 Raises: | |
| 700 CommandException: if cloud digests don't match local digests. | |
| 701 """ | |
| 702 cloud_hashes = {} | |
| 703 if hasattr(key, 'cloud_hashes'): | |
| 704 cloud_hashes = key.cloud_hashes | |
| 705 # Check for older-style MD5-based etag. | |
| 706 etag_md5 = self._GetMD5FromETag(key) | |
| 707 if etag_md5: | |
| 708 cloud_hashes.setdefault('md5', etag_md5) | |
| 709 | |
| 710 local_hashes = {} | |
| 711 # If we've already computed a valid local hash, use that, else calculate an | |
| 712 # md5 or crc32c depending on what we have available to compare against. | |
| 713 if computed_hashes: | |
| 714 local_hashes = computed_hashes | |
| 715 elif hasattr(key, 'local_hashes') and key.local_hashes: | |
| 716 local_hashes = key.local_hashes | |
| 717 elif 'md5' in cloud_hashes and 'md5' in hash_algs_to_compute: | |
| 718 self.logger.info( | |
| 719 'Computing MD5 from scratch for resumed download') | |
| 720 | |
| 721 # Open file in binary mode to avoid surprises in Windows. | |
| 722 with open(file_name, 'rb') as fp: | |
| 723 local_hashes['md5'] = binascii.a2b_hex(key.compute_md5(fp)[0]) | |
| 724 elif 'crc32c' in cloud_hashes and 'crc32c' in hash_algs_to_compute: | |
| 725 self.logger.info( | |
| 726 'Computing CRC32C from scratch for resumed download') | |
| 727 | |
| 728 # Open file in binary mode to avoid surprises in Windows. | |
| 729 with open(file_name, 'rb') as fp: | |
| 730 crc32c_alg = lambda: crcmod.predefined.Crc('crc-32c') | |
| 731 crc32c_hex = key.compute_hash( | |
| 732 fp, algorithm=crc32c_alg)[0] | |
| 733 local_hashes['crc32c'] = binascii.a2b_hex(crc32c_hex) | |
| 734 | |
| 735 for alg in local_hashes: | |
| 736 if alg not in cloud_hashes: | |
| 737 continue | |
| 738 local_hexdigest = binascii.b2a_hex(local_hashes[alg]) | |
| 739 cloud_hexdigest = binascii.b2a_hex(cloud_hashes[alg]) | |
| 740 self.logger.debug('Comparing local vs cloud %s-checksum. (%s/%s)' % ( | |
| 741 alg, local_hexdigest, cloud_hexdigest)) | |
| 742 if local_hexdigest != cloud_hexdigest: | |
| 743 raise CommandException( | |
| 744 '%s signature computed for local file (%s) doesn\'t match ' | |
| 745 'cloud-supplied digest (%s). Local file (%s) deleted.' % ( | |
| 746 alg, local_hexdigest, cloud_hexdigest, file_name)) | |
| 747 | |
| 748 def _CheckForDirFileConflict(self, exp_src_uri, dst_uri): | |
| 749 """Checks whether copying exp_src_uri into dst_uri is not possible. | |
| 750 | |
| 751 This happens if a directory exists in local file system where a file | |
| 752 needs to go or vice versa. In that case we print an error message and | |
| 753 exits. Example: if the file "./x" exists and you try to do: | |
| 754 gsutil cp gs://mybucket/x/y . | |
| 755 the request can't succeed because it requires a directory where | |
| 756 the file x exists. | |
| 757 | |
| 758 Note that we don't enforce any corresponding restrictions for buckets, | |
| 759 because the flat namespace semantics for buckets doesn't prohibit such | |
| 760 cases the way hierarchical file systems do. For example, if a bucket | |
| 761 contains an object called gs://bucket/dir and then you run the command: | |
| 762 gsutil cp file1 file2 gs://bucket/dir | |
| 763 you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and | |
| 764 gs://bucket/dir/file2. | |
| 765 | |
| 766 Args: | |
| 767 exp_src_uri: Expanded source StorageUri of copy. | |
| 768 dst_uri: Destination URI. | |
| 769 | |
| 770 Raises: | |
| 771 CommandException: if errors encountered. | |
| 772 """ | |
| 773 if dst_uri.is_cloud_uri(): | |
| 774 # The problem can only happen for file destination URIs. | |
| 775 return | |
| 776 dst_path = dst_uri.object_name | |
| 777 final_dir = os.path.dirname(dst_path) | |
| 778 if os.path.isfile(final_dir): | |
| 779 raise CommandException('Cannot retrieve %s because a file exists ' | |
| 780 'where a directory needs to be created (%s).' % | |
| 781 (exp_src_uri, final_dir)) | |
| 782 if os.path.isdir(dst_path): | |
| 783 raise CommandException('Cannot retrieve %s because a directory exists ' | |
| 784 '(%s) where the file needs to be created.' % | |
| 785 (exp_src_uri, dst_path)) | |
| 786 | |
| 787 def _InsistDstUriNamesContainer(self, exp_dst_uri, | |
| 788 have_existing_dst_container, command_name): | |
| 789 """ | |
| 790 Raises an exception if URI doesn't name a directory, bucket, or bucket | |
| 791 subdir, with special exception for cp -R (see comments below). | |
| 792 | |
| 793 Args: | |
| 794 exp_dst_uri: Wildcard-expanding dst_uri. | |
| 795 have_existing_dst_container: bool indicator of whether exp_dst_uri | |
| 796 names a container (directory, bucket, or existing bucket subdir). | |
| 797 command_name: Name of command making call. May not be the same as | |
| 798 self.command_name in the case of commands implemented atop other | |
| 799 commands (like mv command). | |
| 800 | |
| 801 Raises: | |
| 802 CommandException: if the URI being checked does not name a container. | |
| 803 """ | |
| 804 if exp_dst_uri.is_file_uri(): | |
| 805 ok = exp_dst_uri.names_directory() | |
| 806 else: | |
| 807 if have_existing_dst_container: | |
| 808 ok = True | |
| 809 else: | |
| 810 # It's ok to specify a non-existing bucket subdir, for example: | |
| 811 # gsutil cp -R dir gs://bucket/abc | |
| 812 # where gs://bucket/abc isn't an existing subdir. | |
| 813 ok = exp_dst_uri.names_object() | |
| 814 if not ok: | |
| 815 raise CommandException('Destination URI must name a directory, bucket, ' | |
| 816 'or bucket\nsubdirectory for the multiple ' | |
| 817 'source form of the %s command.' % command_name) | |
| 818 | |
| 819 class _FileCopyCallbackHandler(object): | |
| 820 """Outputs progress info for large copy requests.""" | |
| 821 | |
| 822 def __init__(self, upload, logger): | |
| 823 if upload: | |
| 824 self.announce_text = 'Uploading' | |
| 825 else: | |
| 826 self.announce_text = 'Downloading' | |
| 827 self.logger = logger | |
| 828 | |
| 829 def call(self, total_bytes_transferred, total_size): | |
| 830 # Use sys.stderr.write instead of self.logger.info so progress messages | |
| 831 # output on a single continuously overwriting line. | |
| 832 if self.logger.isEnabledFor(logging.INFO): | |
| 833 sys.stderr.write('%s: %s/%s \r' % ( | |
| 834 self.announce_text, | |
| 835 MakeHumanReadable(total_bytes_transferred), | |
| 836 MakeHumanReadable(total_size))) | |
| 837 if total_bytes_transferred == total_size: | |
| 838 sys.stderr.write('\n') | |
| 839 | |
| 840 class _StreamCopyCallbackHandler(object): | |
| 841 """Outputs progress info for Stream copy to cloud. | |
| 842 Total Size of the stream is not known, so we output | |
| 843 only the bytes transferred. | |
| 844 """ | |
| 845 | |
| 846 def __init__(self, logger): | |
| 847 self.logger = logger | |
| 848 | |
| 849 def call(self, total_bytes_transferred, total_size): | |
| 850 # Use sys.stderr.write instead of self.logger.info so progress messages | |
| 851 # output on a single continuously overwriting line. | |
| 852 if self.logger.isEnabledFor(logging.INFO): | |
| 853 sys.stderr.write('Uploading: %s \r' % | |
| 854 MakeHumanReadable(total_bytes_transferred)) | |
| 855 if total_size and total_bytes_transferred == total_size: | |
| 856 sys.stderr.write('\n') | |
| 857 | |
| 858 def _GetTransferHandlers(self, dst_uri, size, upload): | |
| 859 """ | |
| 860 Selects upload/download and callback handlers. | |
| 861 | |
| 862 We use a callback handler that shows a simple textual progress indicator | |
| 863 if size is above the configurable threshold. | |
| 864 | |
| 865 We use a resumable transfer handler if size is >= the configurable | |
| 866 threshold and resumable transfers are supported by the given provider. | |
| 867 boto supports resumable downloads for all providers, but resumable | |
| 868 uploads are currently only supported by GS. | |
| 869 | |
| 870 Args: | |
| 871 dst_uri: the destination URI. | |
| 872 size: size of file (object) being uploaded (downloaded). | |
| 873 upload: bool indication of whether transfer is an upload. | |
| 874 """ | |
| 875 config = boto.config | |
| 876 resumable_threshold = config.getint('GSUtil', 'resumable_threshold', TWO_MB) | |
| 877 transfer_handler = None | |
| 878 cb = None | |
| 879 num_cb = None | |
| 880 | |
| 881 # Checks whether the destination file is a "special" file, like /dev/null on | |
| 882 # Linux platforms or null on Windows platforms, so we can disable resumable | |
| 883 # download support since the file size of the destination won't ever be | |
| 884 # correct. | |
| 885 dst_is_special = False | |
| 886 if dst_uri.is_file_uri(): | |
| 887 # Check explicitly first because os.stat doesn't work on 'nul' in Windows. | |
| 888 if dst_uri.object_name == os.devnull: | |
| 889 dst_is_special = True | |
| 890 try: | |
| 891 mode = os.stat(dst_uri.object_name).st_mode | |
| 892 if stat.S_ISCHR(mode): | |
| 893 dst_is_special = True | |
| 894 except OSError: | |
| 895 pass | |
| 896 | |
| 897 if size >= resumable_threshold and not dst_is_special: | |
| 898 cb = self._FileCopyCallbackHandler(upload, self.logger).call | |
| 899 num_cb = int(size / TWO_MB) | |
| 900 | |
| 901 if upload: | |
| 902 tracker_file_type = TrackerFileType.UPLOAD | |
| 903 else: | |
| 904 tracker_file_type = TrackerFileType.DOWNLOAD | |
| 905 tracker_file = self._GetTrackerFilePath(dst_uri, tracker_file_type) | |
| 906 | |
| 907 if upload: | |
| 908 if dst_uri.scheme == 'gs': | |
| 909 is_secure = BOTO_IS_SECURE | |
| 910 if not is_secure[0]: | |
| 911 self.logger.info('\n'.join(textwrap.wrap( | |
| 912 'WARNING: Your boto config file (%s) has is_secure set to ' | |
| 913 'False. Resumable uploads are not secure when performed with ' | |
| 914 'this configuration, so large files are being uploaded with ' | |
| 915 'non-resumable uploads instead.' % GetConfigFilePath()))) | |
| 916 else: | |
| 917 transfer_handler = ResumableUploadHandler(tracker_file) | |
| 918 else: | |
| 919 transfer_handler = ResumableDownloadHandler(tracker_file) | |
| 920 | |
| 921 return (cb, num_cb, transfer_handler) | |
| 922 | |
| 923 def _GetTrackerFilePath(self, dst_uri, tracker_file_type, src_uri=None): | |
| 924 resumable_tracker_dir = CreateTrackerDirIfNeeded() | |
| 925 if tracker_file_type == TrackerFileType.UPLOAD: | |
| 926 # Encode the dest bucket and object name into the tracker file name. | |
| 927 res_tracker_file_name = ( | |
| 928 re.sub('[/\\\\]', '_', 'resumable_upload__%s__%s.url' % | |
| 929 (dst_uri.bucket_name, dst_uri.object_name))) | |
| 930 tracker_file_type_str = "upload" | |
| 931 elif tracker_file_type == TrackerFileType.DOWNLOAD: | |
| 932 # Encode the fully-qualified dest file name into the tracker file name. | |
| 933 res_tracker_file_name = ( | |
| 934 re.sub('[/\\\\]', '_', 'resumable_download__%s.etag' % | |
| 935 (os.path.realpath(dst_uri.object_name)))) | |
| 936 tracker_file_type_str = "download" | |
| 937 elif tracker_file_type == TrackerFileType.PARALLEL_UPLOAD: | |
| 938 # Encode the dest bucket and object names as well as the source file name | |
| 939 # into the tracker file name. | |
| 940 res_tracker_file_name = ( | |
| 941 re.sub('[/\\\\]', '_', 'parallel_upload__%s__%s__%s.url' % | |
| 942 (dst_uri.bucket_name, dst_uri.object_name, src_uri))) | |
| 943 tracker_file_type_str = "parallel_upload" | |
| 944 | |
| 945 res_tracker_file_name = _HashFilename(res_tracker_file_name) | |
| 946 tracker_file_name = '%s_%s' % (tracker_file_type_str, res_tracker_file_name) | |
| 947 tracker_file_path = '%s%s%s' % (resumable_tracker_dir, os.sep, | |
| 948 tracker_file_name) | |
| 949 assert(len(tracker_file_name) < MAX_TRACKER_FILE_NAME_LENGTH) | |
| 950 return tracker_file_path | |
| 951 | |
| 952 def _LogCopyOperation(self, src_uri, dst_uri, headers): | |
| 953 """ | |
| 954 Logs copy operation being performed, including Content-Type if appropriate. | |
| 955 """ | |
| 956 if 'content-type' in headers and dst_uri.is_cloud_uri(): | |
| 957 content_type_msg = ' [Content-Type=%s]' % headers['content-type'] | |
| 958 else: | |
| 959 content_type_msg = '' | |
| 960 if src_uri.is_stream(): | |
| 961 self.logger.info('Copying from <STDIN>%s...', content_type_msg) | |
| 962 else: | |
| 963 self.logger.info('Copying %s%s...', src_uri, content_type_msg) | |
| 964 | |
| 965 def _ProcessCopyObjectToObjectOptions(self, dst_uri, headers): | |
| 966 """ | |
| 967 Common option processing between _CopyObjToObjInTheCloud and | |
| 968 _CopyObjToObjDaisyChainMode. | |
| 969 """ | |
| 970 preserve_acl = False | |
| 971 canned_acl = None | |
| 972 if self.sub_opts: | |
| 973 for o, a in self.sub_opts: | |
| 974 if o == '-a': | |
| 975 canned_acls = dst_uri.canned_acls() | |
| 976 if a not in canned_acls: | |
| 977 raise CommandException('Invalid canned ACL "%s".' % a) | |
| 978 canned_acl = a | |
| 979 headers[dst_uri.get_provider().acl_header] = canned_acl | |
| 980 if o == '-p': | |
| 981 preserve_acl = True | |
| 982 if preserve_acl and canned_acl: | |
| 983 raise CommandException( | |
| 984 'Specifying both the -p and -a options together is invalid.') | |
| 985 return (preserve_acl, canned_acl, headers) | |
| 986 | |
| 987 # We pass the headers explicitly to this call instead of using self.headers | |
| 988 # so we can set different metadata (like Content-Type type) for each object. | |
| 989 def _CopyObjToObjInTheCloud(self, src_key, src_uri, dst_uri, headers): | |
| 990 """Performs copy-in-the cloud from specified src to dest object. | |
| 991 | |
| 992 Args: | |
| 993 src_key: Source Key. | |
| 994 src_uri: Source StorageUri. | |
| 995 dst_uri: Destination StorageUri. | |
| 996 headers: A copy of the top-level headers dictionary. | |
| 997 | |
| 998 Returns: | |
| 999 (elapsed_time, bytes_transferred, dst_uri) excluding overhead like initial | |
| 1000 HEAD. Note: At present copy-in-the-cloud doesn't return the generation of | |
| 1001 the created object, so the returned URI is actually not version-specific | |
| 1002 (unlike other cp cases). | |
| 1003 | |
| 1004 Raises: | |
| 1005 CommandException: if errors encountered. | |
| 1006 """ | |
| 1007 self._SetContentTypeHeader(src_uri, headers) | |
| 1008 self._LogCopyOperation(src_uri, dst_uri, headers) | |
| 1009 # Do Object -> object copy within same provider (uses | |
| 1010 # x-<provider>-copy-source metadata HTTP header to request copying at the | |
| 1011 # server). | |
| 1012 src_bucket = src_uri.get_bucket(False, headers) | |
| 1013 (preserve_acl, canned_acl, headers) = ( | |
| 1014 self._ProcessCopyObjectToObjectOptions(dst_uri, headers)) | |
| 1015 start_time = time.time() | |
| 1016 # Pass headers in headers param not metadata param, so boto will copy | |
| 1017 # existing key's metadata and just set the additional headers specified | |
| 1018 # in the headers param (rather than using the headers to override existing | |
| 1019 # metadata). In particular this allows us to copy the existing key's | |
| 1020 # Content-Type and other metadata users need while still being able to | |
| 1021 # set headers the API needs (like x-goog-project-id). Note that this means | |
| 1022 # you can't do something like: | |
| 1023 # gsutil cp -t Content-Type text/html gs://bucket/* gs://bucket2 | |
| 1024 # to change the Content-Type while copying. | |
| 1025 dst_key = dst_uri.copy_key( | |
| 1026 src_bucket.name, src_uri.object_name, preserve_acl=preserve_acl, | |
| 1027 headers=headers, src_version_id=src_uri.version_id, | |
| 1028 src_generation=src_uri.generation) | |
| 1029 | |
| 1030 end_time = time.time() | |
| 1031 return (end_time - start_time, src_key.size, | |
| 1032 dst_uri.clone_replace_key(dst_key)) | |
| 1033 | |
| 1034 def _CheckFreeSpace(self, path): | |
| 1035 """Return path/drive free space (in bytes).""" | |
| 1036 if platform.system() == 'Windows': | |
| 1037 from ctypes import c_int, c_uint64, c_wchar_p, windll, POINTER, WINFUNCTYP
E, WinError | |
| 1038 try: | |
| 1039 GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_wchar_p, POINTER(c_uint64), | |
| 1040 POINTER(c_uint64), POINTER(c_uint64)) | |
| 1041 GetDiskFreeSpaceEx = GetDiskFreeSpaceEx( | |
| 1042 ('GetDiskFreeSpaceExW', windll.kernel32), ( | |
| 1043 (1, 'lpszPathName'), | |
| 1044 (2, 'lpFreeUserSpace'), | |
| 1045 (2, 'lpTotalSpace'), | |
| 1046 (2, 'lpFreeSpace'),)) | |
| 1047 except AttributeError: | |
| 1048 GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_char_p, POINTER(c_uint64), | |
| 1049 POINTER(c_uint64), POINTER(c_uint64)) | |
| 1050 GetDiskFreeSpaceEx = GetDiskFreeSpaceEx( | |
| 1051 ('GetDiskFreeSpaceExA', windll.kernel32), ( | |
| 1052 (1, 'lpszPathName'), | |
| 1053 (2, 'lpFreeUserSpace'), | |
| 1054 (2, 'lpTotalSpace'), | |
| 1055 (2, 'lpFreeSpace'),)) | |
| 1056 | |
| 1057 def GetDiskFreeSpaceEx_errcheck(result, func, args): | |
| 1058 if not result: | |
| 1059 raise WinError() | |
| 1060 return args[1].value | |
| 1061 GetDiskFreeSpaceEx.errcheck = GetDiskFreeSpaceEx_errcheck | |
| 1062 | |
| 1063 return GetDiskFreeSpaceEx(os.getenv('SystemDrive')) | |
| 1064 else: | |
| 1065 (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path) | |
| 1066 return f_frsize * f_bavail | |
| 1067 | |
| 1068 def _PerformResumableUploadIfApplies(self, fp, src_uri, dst_uri, canned_acl, | |
| 1069 headers, file_size, already_split=False): | |
| 1070 """ | |
| 1071 Performs resumable upload if supported by provider and file is above | |
| 1072 threshold, else performs non-resumable upload. | |
| 1073 | |
| 1074 Returns (elapsed_time, bytes_transferred, version-specific dst_uri). | |
| 1075 """ | |
| 1076 start_time = time.time() | |
| 1077 (cb, num_cb, res_upload_handler) = self._GetTransferHandlers( | |
| 1078 dst_uri, file_size, True) | |
| 1079 if dst_uri.scheme == 'gs': | |
| 1080 # Resumable upload protocol is Google Cloud Storage-specific. | |
| 1081 dst_uri.set_contents_from_file(fp, headers, policy=canned_acl, | |
| 1082 cb=cb, num_cb=num_cb, | |
| 1083 res_upload_handler=res_upload_handler) | |
| 1084 else: | |
| 1085 dst_uri.set_contents_from_file(fp, headers, policy=canned_acl, | |
| 1086 cb=cb, num_cb=num_cb) | |
| 1087 if res_upload_handler: | |
| 1088 # ResumableUploadHandler does not update upload_start_point from its | |
| 1089 # initial value of -1 if transferring the whole file, so clamp at 0 | |
| 1090 bytes_transferred = file_size - max( | |
| 1091 res_upload_handler.upload_start_point, 0) | |
| 1092 if self.use_manifest and not already_split: | |
| 1093 # Save the upload indentifier in the manifest file, unless we're | |
| 1094 # uploading a temporary component for parallel composite uploads. | |
| 1095 self.manifest.Set( | |
| 1096 src_uri, 'upload_id', res_upload_handler.get_upload_id()) | |
| 1097 else: | |
| 1098 bytes_transferred = file_size | |
| 1099 end_time = time.time() | |
| 1100 return (end_time - start_time, bytes_transferred, dst_uri) | |
| 1101 | |
| 1102 def _PerformStreamingUpload(self, fp, dst_uri, headers, canned_acl=None): | |
| 1103 """ | |
| 1104 Performs a streaming upload to the cloud. | |
| 1105 | |
| 1106 Args: | |
| 1107 fp: The file whose contents to upload. | |
| 1108 dst_uri: Destination StorageUri. | |
| 1109 headers: A copy of the top-level headers dictionary. | |
| 1110 canned_acl: Optional canned ACL to set on the object. | |
| 1111 | |
| 1112 Returns (elapsed_time, bytes_transferred, version-specific dst_uri). | |
| 1113 """ | |
| 1114 start_time = time.time() | |
| 1115 | |
| 1116 cb = self._StreamCopyCallbackHandler(self.logger).call | |
| 1117 dst_uri.set_contents_from_stream( | |
| 1118 fp, headers, policy=canned_acl, cb=cb) | |
| 1119 try: | |
| 1120 bytes_transferred = fp.tell() | |
| 1121 except: | |
| 1122 bytes_transferred = 0 | |
| 1123 | |
| 1124 end_time = time.time() | |
| 1125 return (end_time - start_time, bytes_transferred, dst_uri) | |
| 1126 | |
| 1127 def _SetContentTypeHeader(self, src_uri, headers): | |
| 1128 """ | |
| 1129 Sets content type header to value specified in '-h Content-Type' option (if | |
| 1130 specified); else sets using Content-Type detection. | |
| 1131 """ | |
| 1132 if 'content-type' in headers: | |
| 1133 # If empty string specified (i.e., -h "Content-Type:") set header to None, | |
| 1134 # which will inhibit boto from sending the CT header. Otherwise, boto will | |
| 1135 # pass through the user specified CT header. | |
| 1136 if not headers['content-type']: | |
| 1137 headers['content-type'] = None | |
| 1138 # else we'll keep the value passed in via -h option (not performing | |
| 1139 # content type detection). | |
| 1140 else: | |
| 1141 # Only do content type recognition is src_uri is a file. Object-to-object | |
| 1142 # copies with no -h Content-Type specified re-use the content type of the | |
| 1143 # source object. | |
| 1144 if src_uri.is_file_uri(): | |
| 1145 object_name = src_uri.object_name | |
| 1146 content_type = None | |
| 1147 # Streams (denoted by '-') are expected to be 'application/octet-stream' | |
| 1148 # and 'file' would partially consume them. | |
| 1149 if object_name != '-': | |
| 1150 if self.USE_MAGICFILE: | |
| 1151 p = subprocess.Popen(['file', '--mime-type', object_name], | |
| 1152 stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| 1153 output, error = p.communicate() | |
| 1154 if p.returncode != 0 or error: | |
| 1155 raise CommandException( | |
| 1156 'Encountered error running "file --mime-type %s" ' | |
| 1157 '(returncode=%d).\n%s' % (object_name, p.returncode, error)) | |
| 1158 # Parse output by removing line delimiter and splitting on last ": | |
| 1159 content_type = output.rstrip().rpartition(': ')[2] | |
| 1160 else: | |
| 1161 content_type = mimetypes.guess_type(object_name)[0] | |
| 1162 if not content_type: | |
| 1163 content_type = self.DEFAULT_CONTENT_TYPE | |
| 1164 headers['content-type'] = content_type | |
| 1165 | |
| 1166 def _GetFileSize(self, fp): | |
| 1167 """Determines file size different ways for case where fp is actually a | |
| 1168 wrapper around a Key vs an actual file. | |
| 1169 | |
| 1170 Args: | |
| 1171 The file whose size we wish to determine. | |
| 1172 | |
| 1173 Returns: | |
| 1174 The size of the file, in bytes. | |
| 1175 """ | |
| 1176 if isinstance(fp, KeyFile): | |
| 1177 return fp.getkey().size | |
| 1178 else: | |
| 1179 return os.path.getsize(fp.name) | |
| 1180 | |
| 1181 def _UploadFileToObject(self, src_key, src_uri, dst_uri, headers, | |
| 1182 should_log=True, allow_splitting=True): | |
| 1183 """Uploads a local file to an object. | |
| 1184 | |
| 1185 Args: | |
| 1186 src_key: Source StorageUri. Must be a file URI. | |
| 1187 src_uri: Source StorageUri. | |
| 1188 dst_uri: Destination StorageUri. | |
| 1189 headers: The headers dictionary. | |
| 1190 should_log: bool indicator whether we should log this operation. | |
| 1191 Returns: | |
| 1192 (elapsed_time, bytes_transferred, version-specific dst_uri), excluding | |
| 1193 overhead like initial HEAD. | |
| 1194 | |
| 1195 Raises: | |
| 1196 CommandException: if errors encountered. | |
| 1197 """ | |
| 1198 gzip_exts = [] | |
| 1199 canned_acl = None | |
| 1200 if self.sub_opts: | |
| 1201 for o, a in self.sub_opts: | |
| 1202 if o == '-a': | |
| 1203 canned_acls = dst_uri.canned_acls() | |
| 1204 if a not in canned_acls: | |
| 1205 raise CommandException('Invalid canned ACL "%s".' % a) | |
| 1206 canned_acl = a | |
| 1207 elif o == '-t': | |
| 1208 self.logger.warning( | |
| 1209 'Warning: -t is deprecated, and will be removed in the future. ' | |
| 1210 'Content type\ndetection is ' | |
| 1211 'now performed by default, unless inhibited by specifying ' | |
| 1212 'a\nContent-Type header via the -h option.') | |
| 1213 elif o == '-z': | |
| 1214 gzip_exts = a.split(',') | |
| 1215 | |
| 1216 self._SetContentTypeHeader(src_uri, headers) | |
| 1217 if should_log: | |
| 1218 self._LogCopyOperation(src_uri, dst_uri, headers) | |
| 1219 | |
| 1220 if 'content-language' not in headers: | |
| 1221 content_language = config.get_value('GSUtil', 'content_language') | |
| 1222 if content_language: | |
| 1223 headers['content-language'] = content_language | |
| 1224 | |
| 1225 fname_parts = src_uri.object_name.split('.') | |
| 1226 if len(fname_parts) > 1 and fname_parts[-1] in gzip_exts: | |
| 1227 self.logger.debug('Compressing %s (to tmp)...', src_key) | |
| 1228 (gzip_fh, gzip_path) = tempfile.mkstemp() | |
| 1229 gzip_fp = None | |
| 1230 try: | |
| 1231 # Check for temp space. Assume the compressed object is at most 2x | |
| 1232 # the size of the object (normally should compress to smaller than | |
| 1233 # the object) | |
| 1234 if (self._CheckFreeSpace(gzip_path) | |
| 1235 < 2*int(os.path.getsize(src_key.name))): | |
| 1236 raise CommandException('Inadequate temp space available to compress ' | |
| 1237 '%s' % src_key.name) | |
| 1238 gzip_fp = gzip.open(gzip_path, 'wb') | |
| 1239 gzip_fp.writelines(src_key.fp) | |
| 1240 finally: | |
| 1241 if gzip_fp: | |
| 1242 gzip_fp.close() | |
| 1243 os.close(gzip_fh) | |
| 1244 headers['content-encoding'] = 'gzip' | |
| 1245 gzip_fp = open(gzip_path, 'rb') | |
| 1246 try: | |
| 1247 file_size = self._GetFileSize(gzip_fp) | |
| 1248 (elapsed_time, bytes_transferred, result_uri) = ( | |
| 1249 self._PerformResumableUploadIfApplies(gzip_fp, src_uri, dst_uri, | |
| 1250 canned_acl, headers, | |
| 1251 file_size)) | |
| 1252 finally: | |
| 1253 gzip_fp.close() | |
| 1254 try: | |
| 1255 os.unlink(gzip_path) | |
| 1256 # Windows sometimes complains the temp file is locked when you try to | |
| 1257 # delete it. | |
| 1258 except Exception, e: | |
| 1259 pass | |
| 1260 elif (src_key.is_stream() | |
| 1261 and dst_uri.get_provider().supports_chunked_transfer()): | |
| 1262 (elapsed_time, bytes_transferred, result_uri) = ( | |
| 1263 self._PerformStreamingUpload(src_key.fp, dst_uri, headers, | |
| 1264 canned_acl)) | |
| 1265 else: | |
| 1266 if src_key.is_stream(): | |
| 1267 # For Providers that don't support chunked Transfers | |
| 1268 tmp = tempfile.NamedTemporaryFile() | |
| 1269 file_uri = self.suri_builder.StorageUri('file://%s' % tmp.name) | |
| 1270 try: | |
| 1271 file_uri.new_key(False, headers).set_contents_from_file( | |
| 1272 src_key.fp, headers) | |
| 1273 src_key = file_uri.get_key() | |
| 1274 finally: | |
| 1275 file_uri.close() | |
| 1276 try: | |
| 1277 fp = src_key.fp | |
| 1278 file_size = self._GetFileSize(fp) | |
| 1279 if self._ShouldDoParallelCompositeUpload(allow_splitting, src_key, | |
| 1280 dst_uri, file_size): | |
| 1281 (elapsed_time, bytes_transferred, result_uri) = ( | |
| 1282 self._DoParallelCompositeUpload(fp, src_uri, dst_uri, headers, | |
| 1283 canned_acl, file_size)) | |
| 1284 else: | |
| 1285 (elapsed_time, bytes_transferred, result_uri) = ( | |
| 1286 self._PerformResumableUploadIfApplies( | |
| 1287 src_key.fp, src_uri, dst_uri, canned_acl, headers, | |
| 1288 self._GetFileSize(src_key.fp))) | |
| 1289 finally: | |
| 1290 if src_key.is_stream(): | |
| 1291 tmp.close() | |
| 1292 else: | |
| 1293 src_key.close() | |
| 1294 return (elapsed_time, bytes_transferred, result_uri) | |
| 1295 | |
| 1296 def _GetHashAlgs(self, key): | |
| 1297 hash_algs = {} | |
| 1298 check_hashes_config = config.get( | |
| 1299 'GSUtil', 'check_hashes', 'if_fast_else_fail') | |
| 1300 if check_hashes_config == 'never': | |
| 1301 return hash_algs | |
| 1302 if self._GetMD5FromETag(key): | |
| 1303 hash_algs['md5'] = md5 | |
| 1304 if hasattr(key, 'cloud_hashes') and key.cloud_hashes: | |
| 1305 if 'md5' in key.cloud_hashes: | |
| 1306 hash_algs['md5'] = md5 | |
| 1307 # If the cloud provider supplies a CRC, we'll compute a checksum to | |
| 1308 # validate if we're using a native crcmod installation or MD5 isn't | |
| 1309 # offered as an alternative. | |
| 1310 if 'crc32c' in key.cloud_hashes: | |
| 1311 if UsingCrcmodExtension(crcmod): | |
| 1312 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c') | |
| 1313 elif not hash_algs: | |
| 1314 if check_hashes_config == 'if_fast_else_fail': | |
| 1315 raise SLOW_CRC_EXCEPTION | |
| 1316 elif check_hashes_config == 'if_fast_else_skip': | |
| 1317 sys.stderr.write(NO_HASH_CHECK_WARNING) | |
| 1318 elif check_hashes_config == 'always': | |
| 1319 sys.stderr.write(SLOW_CRC_WARNING) | |
| 1320 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c') | |
| 1321 else: | |
| 1322 raise CommandException( | |
| 1323 'Your boto config \'check_hashes\' option is misconfigured.') | |
| 1324 elif not hash_algs: | |
| 1325 if check_hashes_config == 'if_fast_else_skip': | |
| 1326 sys.stderr.write(NO_SERVER_HASH_WARNING) | |
| 1327 else: | |
| 1328 raise NO_SERVER_HASH_EXCEPTION | |
| 1329 return hash_algs | |
| 1330 | |
| 1331 def _DownloadObjectToFile(self, src_key, src_uri, dst_uri, headers, | |
| 1332 should_log=True): | |
| 1333 """Downloads an object to a local file. | |
| 1334 | |
| 1335 Args: | |
| 1336 src_key: Source Key. | |
| 1337 src_uri: Source StorageUri. | |
| 1338 dst_uri: Destination StorageUri. | |
| 1339 headers: The headers dictionary. | |
| 1340 should_log: bool indicator whether we should log this operation. | |
| 1341 Returns: | |
| 1342 (elapsed_time, bytes_transferred, dst_uri), excluding overhead like | |
| 1343 initial HEAD. | |
| 1344 | |
| 1345 Raises: | |
| 1346 CommandException: if errors encountered. | |
| 1347 """ | |
| 1348 if should_log: | |
| 1349 self._LogCopyOperation(src_uri, dst_uri, headers) | |
| 1350 (cb, num_cb, res_download_handler) = self._GetTransferHandlers( | |
| 1351 dst_uri, src_key.size, False) | |
| 1352 file_name = dst_uri.object_name | |
| 1353 dir_name = os.path.dirname(file_name) | |
| 1354 if dir_name and not os.path.exists(dir_name): | |
| 1355 # Do dir creation in try block so can ignore case where dir already | |
| 1356 # exists. This is needed to avoid a race condition when running gsutil | |
| 1357 # -m cp. | |
| 1358 try: | |
| 1359 os.makedirs(dir_name) | |
| 1360 except OSError, e: | |
| 1361 if e.errno != errno.EEXIST: | |
| 1362 raise | |
| 1363 # For gzipped objects, download to a temp file and unzip. | |
| 1364 if (hasattr(src_key, 'content_encoding') | |
| 1365 and src_key.content_encoding == 'gzip'): | |
| 1366 # We can't use tempfile.mkstemp() here because we need a predictable | |
| 1367 # filename for resumable downloads. | |
| 1368 download_file_name = '%s_.gztmp' % file_name | |
| 1369 need_to_unzip = True | |
| 1370 else: | |
| 1371 download_file_name = file_name | |
| 1372 need_to_unzip = False | |
| 1373 | |
| 1374 hash_algs = self._GetHashAlgs(src_key) | |
| 1375 | |
| 1376 # Add accept encoding for download operation. | |
| 1377 AddAcceptEncoding(headers) | |
| 1378 | |
| 1379 fp = None | |
| 1380 try: | |
| 1381 if res_download_handler: | |
| 1382 fp = open(download_file_name, 'ab') | |
| 1383 else: | |
| 1384 fp = open(download_file_name, 'wb') | |
| 1385 start_time = time.time() | |
| 1386 # Use our hash_algs if get_contents_to_file() will accept them, else the | |
| 1387 # default (md5-only) will suffice. | |
| 1388 try: | |
| 1389 src_key.get_contents_to_file(fp, headers, cb=cb, num_cb=num_cb, | |
| 1390 res_download_handler=res_download_handler, | |
| 1391 hash_algs=hash_algs) | |
| 1392 except TypeError: | |
| 1393 src_key.get_contents_to_file(fp, headers, cb=cb, num_cb=num_cb, | |
| 1394 res_download_handler=res_download_handler) | |
| 1395 | |
| 1396 # If a custom test method is defined, call it here. For the copy command, | |
| 1397 # test methods are expected to take one argument: an open file pointer, | |
| 1398 # and are used to perturb the open file during download to exercise | |
| 1399 # download error detection. | |
| 1400 if self.test_method: | |
| 1401 self.test_method(fp) | |
| 1402 end_time = time.time() | |
| 1403 finally: | |
| 1404 if fp: | |
| 1405 fp.close() | |
| 1406 | |
| 1407 if (not need_to_unzip and | |
| 1408 hasattr(src_key, 'content_encoding') | |
| 1409 and src_key.content_encoding == 'gzip'): | |
| 1410 # TODO: HEAD requests are currently not returning proper Content-Encoding | |
| 1411 # headers when an object is gzip-encoded on-the-fly. Remove this once | |
| 1412 # it's fixed. | |
| 1413 renamed_file_name = '%s_.gztmp' % file_name | |
| 1414 os.rename(download_file_name, renamed_file_name) | |
| 1415 download_file_name = renamed_file_name | |
| 1416 need_to_unzip = True | |
| 1417 | |
| 1418 # Discard all hashes if we are resuming a partial download. | |
| 1419 if res_download_handler and res_download_handler.download_start_point: | |
| 1420 src_key.local_hashes = {} | |
| 1421 | |
| 1422 # Verify downloaded file checksum matched source object's checksum. | |
| 1423 digest_verified = True | |
| 1424 computed_hashes = None | |
| 1425 try: | |
| 1426 self._CheckHashes(src_key, download_file_name, hash_algs) | |
| 1427 except CommandException, e: | |
| 1428 # If the digest doesn't match, we'll try checking it again after | |
| 1429 # unzipping. | |
| 1430 if (not need_to_unzip or | |
| 1431 'doesn\'t match cloud-supplied digest' not in str(e)): | |
| 1432 os.unlink(download_file_name) | |
| 1433 raise | |
| 1434 digest_verified = False | |
| 1435 computed_hashes = dict( | |
| 1436 (alg, digester()) | |
| 1437 for alg, digester in self._GetHashAlgs(src_key).iteritems()) | |
| 1438 | |
| 1439 if res_download_handler: | |
| 1440 bytes_transferred = ( | |
| 1441 src_key.size - res_download_handler.download_start_point) | |
| 1442 else: | |
| 1443 bytes_transferred = src_key.size | |
| 1444 | |
| 1445 if need_to_unzip: | |
| 1446 # Log that we're uncompressing if the file is big enough that | |
| 1447 # decompressing would make it look like the transfer "stalled" at the end. | |
| 1448 if bytes_transferred > 10 * 1024 * 1024: | |
| 1449 self.logger.info('Uncompressing downloaded tmp file to %s...', | |
| 1450 file_name) | |
| 1451 | |
| 1452 # Downloaded gzipped file to a filename w/o .gz extension, so unzip. | |
| 1453 f_in = gzip.open(download_file_name, 'rb') | |
| 1454 with open(file_name, 'wb') as f_out: | |
| 1455 data = f_in.read(self.GUNZIP_CHUNK_SIZE) | |
| 1456 while data: | |
| 1457 f_out.write(data) | |
| 1458 if computed_hashes: | |
| 1459 # Compute digests again on the uncompressed data. | |
| 1460 for alg in computed_hashes.itervalues(): | |
| 1461 alg.update(data) | |
| 1462 data = f_in.read(self.GUNZIP_CHUNK_SIZE) | |
| 1463 f_in.close() | |
| 1464 | |
| 1465 os.unlink(download_file_name) | |
| 1466 | |
| 1467 if not digest_verified: | |
| 1468 computed_hashes = dict((alg, digester.digest()) | |
| 1469 for alg, digester in computed_hashes.iteritems()) | |
| 1470 try: | |
| 1471 self._CheckHashes( | |
| 1472 src_key, file_name, hash_algs, computed_hashes=computed_hashes) | |
| 1473 except CommandException, e: | |
| 1474 os.unlink(file_name) | |
| 1475 raise | |
| 1476 | |
| 1477 return (end_time - start_time, bytes_transferred, dst_uri) | |
| 1478 | |
| 1479 def _PerformDownloadToStream(self, src_key, src_uri, str_fp, headers): | |
| 1480 (cb, num_cb, res_download_handler) = self._GetTransferHandlers( | |
| 1481 src_uri, src_key.size, False) | |
| 1482 start_time = time.time() | |
| 1483 src_key.get_contents_to_file(str_fp, headers, cb=cb, num_cb=num_cb) | |
| 1484 end_time = time.time() | |
| 1485 bytes_transferred = src_key.size | |
| 1486 end_time = time.time() | |
| 1487 return (end_time - start_time, bytes_transferred) | |
| 1488 | |
| 1489 def _CopyFileToFile(self, src_key, src_uri, dst_uri, headers): | |
| 1490 """Copies a local file to a local file. | |
| 1491 | |
| 1492 Args: | |
| 1493 src_key: Source StorageUri. Must be a file URI. | |
| 1494 src_uri: Source StorageUri. | |
| 1495 dst_uri: Destination StorageUri. | |
| 1496 headers: The headers dictionary. | |
| 1497 Returns: | |
| 1498 (elapsed_time, bytes_transferred, dst_uri), excluding | |
| 1499 overhead like initial HEAD. | |
| 1500 | |
| 1501 Raises: | |
| 1502 CommandException: if errors encountered. | |
| 1503 """ | |
| 1504 self._LogCopyOperation(src_uri, dst_uri, headers) | |
| 1505 dst_key = dst_uri.new_key(False, headers) | |
| 1506 start_time = time.time() | |
| 1507 dst_key.set_contents_from_file(src_key.fp, headers) | |
| 1508 end_time = time.time() | |
| 1509 return (end_time - start_time, os.path.getsize(src_key.fp.name), dst_uri) | |
| 1510 | |
| 1511 def _CopyObjToObjDaisyChainMode(self, src_key, src_uri, dst_uri, headers): | |
| 1512 """Copies from src_uri to dst_uri in "daisy chain" mode. | |
| 1513 See -D OPTION documentation about what daisy chain mode is. | |
| 1514 | |
| 1515 Args: | |
| 1516 src_key: Source Key. | |
| 1517 src_uri: Source StorageUri. | |
| 1518 dst_uri: Destination StorageUri. | |
| 1519 headers: A copy of the top-level headers dictionary. | |
| 1520 | |
| 1521 Returns: | |
| 1522 (elapsed_time, bytes_transferred, version-specific dst_uri) excluding | |
| 1523 overhead like initial HEAD. | |
| 1524 | |
| 1525 Raises: | |
| 1526 CommandException: if errors encountered. | |
| 1527 """ | |
| 1528 # Start with copy of input headers, so we'll include any headers that need | |
| 1529 # to be set from higher up in call stack (like x-goog-if-generation-match). | |
| 1530 headers = headers.copy() | |
| 1531 # Now merge headers from src_key so we'll preserve metadata. | |
| 1532 # Unfortunately boto separates headers into ones it puts in the metadata | |
| 1533 # dict and ones it pulls out into specific key fields, so we need to walk | |
| 1534 # through the latter list to find the headers that we copy over to the dest | |
| 1535 # object. | |
| 1536 for header_name, field_name in ( | |
| 1537 ('cache-control', 'cache_control'), | |
| 1538 ('content-type', 'content_type'), | |
| 1539 ('content-language', 'content_language'), | |
| 1540 ('content-encoding', 'content_encoding'), | |
| 1541 ('content-disposition', 'content_disposition')): | |
| 1542 value = getattr(src_key, field_name, None) | |
| 1543 if value: | |
| 1544 headers[header_name] = value | |
| 1545 # Boto represents x-goog-meta-* headers in metadata dict with the | |
| 1546 # x-goog-meta- or x-amx-meta- prefix stripped. Turn these back into headers | |
| 1547 # for the destination object. | |
| 1548 for name, value in src_key.metadata.items(): | |
| 1549 header_name = '%smeta-%s' % (dst_uri.get_provider().header_prefix, name) | |
| 1550 headers[header_name] = value | |
| 1551 # Set content type if specified in '-h Content-Type' option. | |
| 1552 self._SetContentTypeHeader(src_uri, headers) | |
| 1553 self._LogCopyOperation(src_uri, dst_uri, headers) | |
| 1554 (preserve_acl, canned_acl, headers) = ( | |
| 1555 self._ProcessCopyObjectToObjectOptions(dst_uri, headers)) | |
| 1556 if preserve_acl: | |
| 1557 if src_uri.get_provider() != dst_uri.get_provider(): | |
| 1558 # We don't attempt to preserve ACLs across providers because | |
| 1559 # GCS and S3 support different ACLs and disjoint principals. | |
| 1560 raise NotImplementedError('Cross-provider cp -p not supported') | |
| 1561 # We need to read and write the ACL manually because the | |
| 1562 # Key.set_contents_from_file() API doesn't provide a preserve_acl | |
| 1563 # parameter (unlike the Bucket.copy_key() API used | |
| 1564 # by_CopyObjToObjInTheCloud). | |
| 1565 acl = src_uri.get_acl(headers=headers) | |
| 1566 fp = KeyFile(src_key) | |
| 1567 result = self._PerformResumableUploadIfApplies(fp, src_uri, | |
| 1568 dst_uri, canned_acl, headers, | |
| 1569 self._GetFileSize(fp)) | |
| 1570 if preserve_acl: | |
| 1571 # If user specified noclobber flag, we need to remove the | |
| 1572 # x-goog-if-generation-match:0 header that was set when uploading the | |
| 1573 # object, because that precondition would fail when updating the ACL on | |
| 1574 # the now-existing object. | |
| 1575 if self.no_clobber: | |
| 1576 del headers['x-goog-if-generation-match'] | |
| 1577 # Remove the owner field from the ACL in case we're copying from an object | |
| 1578 # that is owned by a different user. If we left that other user in the | |
| 1579 # ACL, attempting to set the ACL would result in a 400 (Bad Request). | |
| 1580 if hasattr(acl, 'owner'): | |
| 1581 del acl.owner | |
| 1582 dst_uri.set_acl(acl, dst_uri.object_name, headers=headers) | |
| 1583 return result | |
| 1584 | |
| 1585 def _PerformCopy(self, src_uri, dst_uri, allow_splitting=True): | |
| 1586 """Performs copy from src_uri to dst_uri, handling various special cases. | |
| 1587 | |
| 1588 Args: | |
| 1589 src_uri: Source StorageUri. | |
| 1590 dst_uri: Destination StorageUri. | |
| 1591 allow_splitting: Whether to allow the file to be split into component | |
| 1592 pieces for an parallel composite upload. | |
| 1593 | |
| 1594 Returns: | |
| 1595 (elapsed_time, bytes_transferred, version-specific dst_uri) excluding | |
| 1596 overhead like initial HEAD. | |
| 1597 | |
| 1598 Raises: | |
| 1599 CommandException: if errors encountered. | |
| 1600 """ | |
| 1601 # Make a copy of the input headers each time so we can set a different | |
| 1602 # content type for each object. | |
| 1603 headers = self.headers.copy() if self.headers else {} | |
| 1604 download_headers = headers.copy() | |
| 1605 # Add accept encoding for download operation. | |
| 1606 AddAcceptEncoding(download_headers) | |
| 1607 | |
| 1608 src_key = src_uri.get_key(False, download_headers) | |
| 1609 if not src_key: | |
| 1610 raise CommandException('"%s" does not exist.' % src_uri) | |
| 1611 | |
| 1612 if self.use_manifest: | |
| 1613 # Set the source size in the manifest. | |
| 1614 self.manifest.Set(src_uri, 'size', getattr(src_key, 'size', None)) | |
| 1615 | |
| 1616 # On Windows, stdin is opened as text mode instead of binary which causes | |
| 1617 # problems when piping a binary file, so this switches it to binary mode. | |
| 1618 if IS_WINDOWS and src_uri.is_file_uri() and src_key.is_stream(): | |
| 1619 import msvcrt | |
| 1620 msvcrt.setmode(src_key.fp.fileno(), os.O_BINARY) | |
| 1621 | |
| 1622 if self.no_clobber: | |
| 1623 # There are two checks to prevent clobbering: | |
| 1624 # 1) The first check is to see if the item | |
| 1625 # already exists at the destination and prevent the upload/download | |
| 1626 # from happening. This is done by the exists() call. | |
| 1627 # 2) The second check is only relevant if we are writing to gs. We can | |
| 1628 # enforce that the server only writes the object if it doesn't exist | |
| 1629 # by specifying the header below. This check only happens at the | |
| 1630 # server after the complete file has been uploaded. We specify this | |
| 1631 # header to prevent a race condition where a destination file may | |
| 1632 # be created after the first check and before the file is fully | |
| 1633 # uploaded. | |
| 1634 # In order to save on unnecessary uploads/downloads we perform both | |
| 1635 # checks. However, this may come at the cost of additional HTTP calls. | |
| 1636 if dst_uri.exists(download_headers): | |
| 1637 if dst_uri.is_file_uri(): | |
| 1638 # The local file may be a partial. Check the file sizes. | |
| 1639 if src_key.size == dst_uri.get_key(headers=download_headers).size: | |
| 1640 raise ItemExistsError() | |
| 1641 else: | |
| 1642 raise ItemExistsError() | |
| 1643 if dst_uri.is_cloud_uri() and dst_uri.scheme == 'gs': | |
| 1644 headers['x-goog-if-generation-match'] = '0' | |
| 1645 | |
| 1646 if src_uri.is_cloud_uri() and dst_uri.is_cloud_uri(): | |
| 1647 if src_uri.scheme == dst_uri.scheme and not self.daisy_chain: | |
| 1648 return self._CopyObjToObjInTheCloud(src_key, src_uri, dst_uri, headers) | |
| 1649 else: | |
| 1650 return self._CopyObjToObjDaisyChainMode(src_key, src_uri, dst_uri, | |
| 1651 headers) | |
| 1652 elif src_uri.is_file_uri() and dst_uri.is_cloud_uri(): | |
| 1653 return self._UploadFileToObject(src_key, src_uri, dst_uri, | |
| 1654 download_headers) | |
| 1655 elif src_uri.is_cloud_uri() and dst_uri.is_file_uri(): | |
| 1656 return self._DownloadObjectToFile(src_key, src_uri, dst_uri, | |
| 1657 download_headers) | |
| 1658 elif src_uri.is_file_uri() and dst_uri.is_file_uri(): | |
| 1659 return self._CopyFileToFile(src_key, src_uri, dst_uri, download_headers) | |
| 1660 else: | |
| 1661 raise CommandException('Unexpected src/dest case') | |
| 1662 | |
| 1663 def _PartitionFile(self, fp, file_size, src_uri, headers, canned_acl, bucket, | |
| 1664 random_prefix, tracker_file, tracker_file_lock): | |
| 1665 """Partitions a file into FilePart objects to be uploaded and later composed | |
| 1666 into an object matching the original file. This entails splitting the | |
| 1667 file into parts, naming and forming a destination URI for each part, | |
| 1668 and also providing the PerformResumableUploadIfAppliesArgs object | |
| 1669 corresponding to each part. | |
| 1670 | |
| 1671 Args: | |
| 1672 fp: The file object to be partitioned. | |
| 1673 file_size: The size of fp, in bytes. | |
| 1674 src_uri: The source StorageUri fromed from the original command. | |
| 1675 headers: The headers which ultimately passed to boto. | |
| 1676 canned_acl: The user-provided canned_acl, if applicable. | |
| 1677 bucket: The name of the destination bucket, of the form gs://bucket | |
| 1678 random_prefix: The randomly-generated prefix used to prevent collisions | |
| 1679 among the temporary component names. | |
| 1680 tracker_file: The path to the parallel composite upload tracker file. | |
| 1681 tracker_file_lock: The lock protecting access to the tracker file. | |
| 1682 | |
| 1683 Returns: | |
| 1684 dst_args: The destination URIs for the temporary component objects. | |
| 1685 """ | |
| 1686 parallel_composite_upload_component_size = HumanReadableToBytes( | |
| 1687 boto.config.get('GSUtil', 'parallel_composite_upload_component_size', | |
| 1688 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE)) | |
| 1689 (num_components, component_size) = _GetPartitionInfo(file_size, | |
| 1690 MAX_COMPOSE_ARITY, parallel_composite_upload_component_size) | |
| 1691 | |
| 1692 # Make sure that the temporary objects don't already exist. | |
| 1693 tmp_object_headers = copy.deepcopy(headers) | |
| 1694 tmp_object_headers['x-goog-if-generation-match'] = '0' | |
| 1695 | |
| 1696 uri_strs = [] # Used to create a NameExpansionIterator. | |
| 1697 dst_args = {} # Arguments to create commands and pass to subprocesses. | |
| 1698 file_names = [] # Used for the 2-step process of forming dst_args. | |
| 1699 for i in range(num_components): | |
| 1700 # "Salt" the object name with something a user is very unlikely to have | |
| 1701 # used in an object name, then hash the extended name to make sure | |
| 1702 # we don't run into problems with name length. Using a deterministic | |
| 1703 # naming scheme for the temporary components allows users to take | |
| 1704 # advantage of resumable uploads for each component. | |
| 1705 encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode('utf-8') | |
| 1706 content_md5 = md5() | |
| 1707 content_md5.update(encoded_name) | |
| 1708 digest = content_md5.hexdigest() | |
| 1709 temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE + | |
| 1710 digest + '_' + str(i)) | |
| 1711 tmp_dst_uri = MakeGsUri(bucket, temp_file_name, self.suri_builder) | |
| 1712 | |
| 1713 if i < (num_components - 1): | |
| 1714 # Every component except possibly the last is the same size. | |
| 1715 file_part_length = component_size | |
| 1716 else: | |
| 1717 # The last component just gets all of the remaining bytes. | |
| 1718 file_part_length = (file_size - ((num_components -1) * component_size)) | |
| 1719 offset = i * component_size | |
| 1720 func_args = PerformResumableUploadIfAppliesArgs( | |
| 1721 fp.name, offset, file_part_length, src_uri, tmp_dst_uri, canned_acl, | |
| 1722 headers, tracker_file, tracker_file_lock) | |
| 1723 file_names.append(temp_file_name) | |
| 1724 dst_args[temp_file_name] = func_args | |
| 1725 uri_strs.append(self._MakeGsUriStr(bucket, temp_file_name)) | |
| 1726 | |
| 1727 return dst_args | |
| 1728 | |
| 1729 def _MakeFileUri(self, filename): | |
| 1730 """Returns a StorageUri for a local file.""" | |
| 1731 return self.suri_builder.StorageUri(filename) | |
| 1732 | |
| 1733 def _MakeGsUriStr(self, bucket, filename): | |
| 1734 """Returns a string of the form gs://bucket/filename, used to indicate an | |
| 1735 object in Google Cloud Storage. | |
| 1736 """ | |
| 1737 return 'gs://' + bucket + '/' + filename | |
| 1738 | |
| 1739 def _DoParallelCompositeUpload(self, fp, src_uri, dst_uri, headers, | |
| 1740 canned_acl, file_size): | |
| 1741 """Uploads a local file to an object in the cloud for the Parallel Composite | |
| 1742 Uploads feature. The file is partitioned into parts, and then the parts | |
| 1743 are uploaded in parallel, composed to form the original destination | |
| 1744 object, and deleted. | |
| 1745 | |
| 1746 Args: | |
| 1747 fp: The file object to be uploaded. | |
| 1748 src_uri: The StorageURI of the local file. | |
| 1749 dst_uri: The StorageURI of the destination file. | |
| 1750 headers: The headers to pass to boto, if any. | |
| 1751 canned_acl: The canned acl to apply to the object, if any. | |
| 1752 file_size: The size of the source file in bytes. | |
| 1753 """ | |
| 1754 start_time = time.time() | |
| 1755 gs_prefix = 'gs://' | |
| 1756 bucket = gs_prefix + dst_uri.bucket_name | |
| 1757 if 'content-type' in headers and not headers['content-type']: | |
| 1758 del headers['content-type'] | |
| 1759 | |
| 1760 # Determine which components, if any, have already been successfully | |
| 1761 # uploaded. | |
| 1762 tracker_file = self._GetTrackerFilePath(dst_uri, | |
| 1763 TrackerFileType.PARALLEL_UPLOAD, | |
| 1764 src_uri) | |
| 1765 tracker_file_lock = CreateLock() | |
| 1766 (random_prefix, existing_components) = ( | |
| 1767 _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock)) | |
| 1768 | |
| 1769 # Create the initial tracker file for the upload. | |
| 1770 _CreateParallelUploadTrackerFile(tracker_file, random_prefix, | |
| 1771 existing_components, tracker_file_lock) | |
| 1772 | |
| 1773 # Get the set of all components that should be uploaded. | |
| 1774 dst_args = self._PartitionFile(fp, file_size, src_uri, headers, canned_acl, | |
| 1775 bucket, random_prefix, tracker_file, | |
| 1776 tracker_file_lock) | |
| 1777 | |
| 1778 (components_to_upload, existing_components, existing_objects_to_delete) = ( | |
| 1779 FilterExistingComponents(dst_args, existing_components, bucket, | |
| 1780 self.suri_builder)) | |
| 1781 | |
| 1782 # In parallel, copy all of the file parts that haven't already been | |
| 1783 # uploaded to temporary objects. | |
| 1784 cp_results = self.Apply(_PerformResumableUploadIfAppliesWrapper, | |
| 1785 components_to_upload, | |
| 1786 _CopyExceptionHandler, | |
| 1787 ('copy_failure_count', 'total_bytes_transferred'), | |
| 1788 arg_checker=gslib.command.DummyArgChecker, | |
| 1789 parallel_operations_override=True, | |
| 1790 should_return_results=True) | |
| 1791 uploaded_components = [] | |
| 1792 total_bytes_uploaded = 0 | |
| 1793 for cp_result in cp_results: | |
| 1794 total_bytes_uploaded += cp_result[1] | |
| 1795 uploaded_components.append(cp_result[2]) | |
| 1796 components = uploaded_components + existing_components | |
| 1797 | |
| 1798 if len(components) == len(dst_args): | |
| 1799 # Only try to compose if all of the components were uploaded successfully. | |
| 1800 | |
| 1801 # Sort the components so that they will be composed in the correct order. | |
| 1802 components = sorted( | |
| 1803 components, key=lambda component: | |
| 1804 int(component.object_name[component.object_name.rfind('_')+1:])) | |
| 1805 result_uri = dst_uri.compose(components, headers=headers) | |
| 1806 | |
| 1807 try: | |
| 1808 # Make sure only to delete things that we know were successfully | |
| 1809 # uploaded (as opposed to all of the objects that we attempted to | |
| 1810 # create) so that we don't delete any preexisting objects, except for | |
| 1811 # those that were uploaded by a previous, failed run and have since | |
| 1812 # changed (but still have an old generation lying around). | |
| 1813 objects_to_delete = components + existing_objects_to_delete | |
| 1814 self.Apply(_DeleteKeyFn, objects_to_delete, _RmExceptionHandler, | |
| 1815 arg_checker=gslib.command.DummyArgChecker, | |
| 1816 parallel_operations_override=True) | |
| 1817 except Exception, e: | |
| 1818 if (e.message and ('unexpected failure in' in e.message) | |
| 1819 and ('sub-processes, aborting' in e.message)): | |
| 1820 # If some of the delete calls fail, don't cause the whole command to | |
| 1821 # fail. The copy was successful iff the compose call succeeded, so | |
| 1822 # just raise whatever exception (if any) happened before this instead, | |
| 1823 # and reduce this to a warning. | |
| 1824 logging.warning( | |
| 1825 'Failed to delete some of the following temporary objects:\n' + | |
| 1826 '\n'.join(dst_args.keys())) | |
| 1827 else: | |
| 1828 raise e | |
| 1829 finally: | |
| 1830 with tracker_file_lock: | |
| 1831 if os.path.exists(tracker_file): | |
| 1832 os.unlink(tracker_file) | |
| 1833 else: | |
| 1834 # Some of the components failed to upload. In this case, we want to exit | |
| 1835 # without deleting the objects. | |
| 1836 raise CommandException( | |
| 1837 'Some temporary components were not uploaded successfully. ' | |
| 1838 'Please retry this upload.') | |
| 1839 | |
| 1840 return (time.time() - start_time, total_bytes_uploaded, result_uri) | |
| 1841 | |
| 1842 def _ShouldDoParallelCompositeUpload(self, allow_splitting, src_key, dst_uri, | |
| 1843 file_size): | |
| 1844 """Returns True iff a parallel upload should be performed on the source key. | |
| 1845 | |
| 1846 Args: | |
| 1847 allow_splitting: If false, then this function returns false. | |
| 1848 src_key: Corresponding to a local file. | |
| 1849 dst_uri: Corresponding to an object in the cloud. | |
| 1850 file_size: The size of the source file, in bytes. | |
| 1851 """ | |
| 1852 parallel_composite_upload_threshold = HumanReadableToBytes(boto.config.get( | |
| 1853 'GSUtil', 'parallel_composite_upload_threshold', | |
| 1854 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD)) | |
| 1855 return (allow_splitting # Don't split the pieces multiple times. | |
| 1856 and not src_key.is_stream() # We can't partition streams. | |
| 1857 and dst_uri.scheme == 'gs' # Compose is only for gs. | |
| 1858 and parallel_composite_upload_threshold > 0 | |
| 1859 and file_size >= parallel_composite_upload_threshold | |
| 1860 and file_size >= MIN_PARALLEL_COMPOSITE_FILE_SIZE) | |
| 1861 | |
| 1862 def _ExpandDstUri(self, dst_uri_str): | |
| 1863 """ | |
| 1864 Expands wildcard if present in dst_uri_str. | |
| 1865 | |
| 1866 Args: | |
| 1867 dst_uri_str: String representation of requested dst_uri. | |
| 1868 | |
| 1869 Returns: | |
| 1870 (exp_dst_uri, have_existing_dst_container) | |
| 1871 where have_existing_dst_container is a bool indicating whether | |
| 1872 exp_dst_uri names an existing directory, bucket, or bucket subdirectory. | |
| 1873 | |
| 1874 Raises: | |
| 1875 CommandException: if dst_uri_str matched more than 1 URI. | |
| 1876 """ | |
| 1877 dst_uri = self.suri_builder.StorageUri(dst_uri_str) | |
| 1878 | |
| 1879 # Handle wildcarded dst_uri case. | |
| 1880 if ContainsWildcard(dst_uri): | |
| 1881 blr_expansion = list(self.WildcardIterator(dst_uri)) | |
| 1882 if len(blr_expansion) != 1: | |
| 1883 raise CommandException('Destination (%s) must match exactly 1 URI' % | |
| 1884 dst_uri_str) | |
| 1885 blr = blr_expansion[0] | |
| 1886 uri = blr.GetUri() | |
| 1887 if uri.is_cloud_uri(): | |
| 1888 return (uri, uri.names_bucket() or blr.HasPrefix() | |
| 1889 or blr.GetKey().name.endswith('/')) | |
| 1890 else: | |
| 1891 return (uri, uri.names_directory()) | |
| 1892 | |
| 1893 # Handle non-wildcarded dst_uri: | |
| 1894 if dst_uri.is_file_uri(): | |
| 1895 return (dst_uri, dst_uri.names_directory()) | |
| 1896 if dst_uri.names_bucket(): | |
| 1897 return (dst_uri, True) | |
| 1898 # For object URIs check 3 cases: (a) if the name ends with '/' treat as a | |
| 1899 # subdir; else, perform a wildcard expansion with dst_uri + "*" and then | |
| 1900 # find if (b) there's a Prefix matching dst_uri, or (c) name is of form | |
| 1901 # dir_$folder$ (and in both these cases also treat dir as a subdir). | |
| 1902 if dst_uri.is_cloud_uri() and dst_uri_str.endswith('/'): | |
| 1903 return (dst_uri, True) | |
| 1904 blr_expansion = list(self.WildcardIterator( | |
| 1905 '%s*' % dst_uri_str.rstrip(dst_uri.delim))) | |
| 1906 for blr in blr_expansion: | |
| 1907 if blr.GetRStrippedUriString().endswith('_$folder$'): | |
| 1908 return (dst_uri, True) | |
| 1909 if blr.GetRStrippedUriString() == dst_uri_str.rstrip(dst_uri.delim): | |
| 1910 return (dst_uri, blr.HasPrefix()) | |
| 1911 return (dst_uri, False) | |
| 1912 | |
| 1913 def _ConstructDstUri(self, src_uri, exp_src_uri, | |
| 1914 src_uri_names_container, src_uri_expands_to_multi, | |
| 1915 have_multiple_srcs, exp_dst_uri, | |
| 1916 have_existing_dest_subdir): | |
| 1917 """ | |
| 1918 Constructs the destination URI for a given exp_src_uri/exp_dst_uri pair, | |
| 1919 using context-dependent naming rules that mimic Linux cp and mv behavior. | |
| 1920 | |
| 1921 Args: | |
| 1922 src_uri: src_uri to be copied. | |
| 1923 exp_src_uri: Single StorageUri from wildcard expansion of src_uri. | |
| 1924 src_uri_names_container: True if src_uri names a container (including the | |
| 1925 case of a wildcard-named bucket subdir (like gs://bucket/abc, | |
| 1926 where gs://bucket/abc/* matched some objects). Note that this is | |
| 1927 additional semantics tha src_uri.names_container() doesn't understand | |
| 1928 because the latter only understands StorageUris, not wildcards. | |
| 1929 src_uri_expands_to_multi: True if src_uri expanded to multiple URIs. | |
| 1930 have_multiple_srcs: True if this is a multi-source request. This can be | |
| 1931 true if src_uri wildcard-expanded to multiple URIs or if there were | |
| 1932 multiple source URIs in the request. | |
| 1933 exp_dst_uri: the expanded StorageUri requested for the cp destination. | |
| 1934 Final written path is constructed from this plus a context-dependent | |
| 1935 variant of src_uri. | |
| 1936 have_existing_dest_subdir: bool indicator whether dest is an existing | |
| 1937 subdirectory. | |
| 1938 | |
| 1939 Returns: | |
| 1940 StorageUri to use for copy. | |
| 1941 | |
| 1942 Raises: | |
| 1943 CommandException if destination object name not specified for | |
| 1944 source and source is a stream. | |
| 1945 """ | |
| 1946 if self._ShouldTreatDstUriAsSingleton( | |
| 1947 have_multiple_srcs, have_existing_dest_subdir, exp_dst_uri): | |
| 1948 # We're copying one file or object to one file or object. | |
| 1949 return exp_dst_uri | |
| 1950 | |
| 1951 if exp_src_uri.is_stream(): | |
| 1952 if exp_dst_uri.names_container(): | |
| 1953 raise CommandException('Destination object name needed when ' | |
| 1954 'source is a stream') | |
| 1955 return exp_dst_uri | |
| 1956 | |
| 1957 if not self.recursion_requested and not have_multiple_srcs: | |
| 1958 # We're copying one file or object to a subdirectory. Append final comp | |
| 1959 # of exp_src_uri to exp_dst_uri. | |
| 1960 src_final_comp = exp_src_uri.object_name.rpartition(src_uri.delim)[-1] | |
| 1961 return self.suri_builder.StorageUri('%s%s%s' % ( | |
| 1962 exp_dst_uri.uri.rstrip(exp_dst_uri.delim), exp_dst_uri.delim, | |
| 1963 src_final_comp)) | |
| 1964 | |
| 1965 # Else we're copying multiple sources to a directory, bucket, or a bucket | |
| 1966 # "sub-directory". | |
| 1967 | |
| 1968 # Ensure exp_dst_uri ends in delim char if we're doing a multi-src copy or | |
| 1969 # a copy to a directory. (The check for copying to a directory needs | |
| 1970 # special-case handling so that the command: | |
| 1971 # gsutil cp gs://bucket/obj dir | |
| 1972 # will turn into file://dir/ instead of file://dir -- the latter would cause | |
| 1973 # the file "dirobj" to be created.) | |
| 1974 # Note: need to check have_multiple_srcs or src_uri.names_container() | |
| 1975 # because src_uri could be a bucket containing a single object, named | |
| 1976 # as gs://bucket. | |
| 1977 if ((have_multiple_srcs or src_uri.names_container() | |
| 1978 or os.path.isdir(exp_dst_uri.object_name)) | |
| 1979 and not exp_dst_uri.uri.endswith(exp_dst_uri.delim)): | |
| 1980 exp_dst_uri = exp_dst_uri.clone_replace_name( | |
| 1981 '%s%s' % (exp_dst_uri.object_name, exp_dst_uri.delim) | |
| 1982 ) | |
| 1983 | |
| 1984 # Making naming behavior match how things work with local Linux cp and mv | |
| 1985 # operations depends on many factors, including whether the destination is a | |
| 1986 # container, the plurality of the source(s), and whether the mv command is | |
| 1987 # being used: | |
| 1988 # 1. For the "mv" command that specifies a non-existent destination subdir, | |
| 1989 # renaming should occur at the level of the src subdir, vs appending that | |
| 1990 # subdir beneath the dst subdir like is done for copying. For example: | |
| 1991 # gsutil rm -R gs://bucket | |
| 1992 # gsutil cp -R dir1 gs://bucket | |
| 1993 # gsutil cp -R dir2 gs://bucket/subdir1 | |
| 1994 # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2 | |
| 1995 # would (if using cp naming behavior) end up with paths like: | |
| 1996 # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops | |
| 1997 # whereas mv naming behavior should result in: | |
| 1998 # gs://bucket/subdir2/dir2/.svn/all-wcprops | |
| 1999 # 2. Copying from directories, buckets, or bucket subdirs should result in | |
| 2000 # objects/files mirroring the source directory hierarchy. For example: | |
| 2001 # gsutil cp dir1/dir2 gs://bucket | |
| 2002 # should create the object gs://bucket/dir2/file2, assuming dir1/dir2 | |
| 2003 # contains file2). | |
| 2004 # To be consistent with Linux cp behavior, there's one more wrinkle when | |
| 2005 # working with subdirs: The resulting object names depend on whether the | |
| 2006 # destination subdirectory exists. For example, if gs://bucket/subdir | |
| 2007 # exists, the command: | |
| 2008 # gsutil cp -R dir1/dir2 gs://bucket/subdir | |
| 2009 # should create objects named like gs://bucket/subdir/dir2/a/b/c. In | |
| 2010 # contrast, if gs://bucket/subdir does not exist, this same command | |
| 2011 # should create objects named like gs://bucket/subdir/a/b/c. | |
| 2012 # 3. Copying individual files or objects to dirs, buckets or bucket subdirs | |
| 2013 # should result in objects/files named by the final source file name | |
| 2014 # component. Example: | |
| 2015 # gsutil cp dir1/*.txt gs://bucket | |
| 2016 # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt, | |
| 2017 # assuming dir1 contains f1.txt and f2.txt. | |
| 2018 | |
| 2019 if (self.perform_mv and self.recursion_requested | |
| 2020 and src_uri_expands_to_multi and not have_existing_dest_subdir): | |
| 2021 # Case 1. Handle naming rules for bucket subdir mv. Here we want to | |
| 2022 # line up the src_uri against its expansion, to find the base to build | |
| 2023 # the new name. For example, running the command: | |
| 2024 # gsutil mv gs://bucket/abcd gs://bucket/xyz | |
| 2025 # when processing exp_src_uri=gs://bucket/abcd/123 | |
| 2026 # exp_src_uri_tail should become /123 | |
| 2027 # Note: mv.py code disallows wildcard specification of source URI. | |
| 2028 exp_src_uri_tail = exp_src_uri.uri[len(src_uri.uri):] | |
| 2029 dst_key_name = '%s/%s' % (exp_dst_uri.object_name.rstrip('/'), | |
| 2030 exp_src_uri_tail.strip('/')) | |
| 2031 return exp_dst_uri.clone_replace_name(dst_key_name) | |
| 2032 | |
| 2033 if src_uri_names_container and not exp_dst_uri.names_file(): | |
| 2034 # Case 2. Build dst_key_name from subpath of exp_src_uri past | |
| 2035 # where src_uri ends. For example, for src_uri=gs://bucket/ and | |
| 2036 # exp_src_uri=gs://bucket/src_subdir/obj, dst_key_name should be | |
| 2037 # src_subdir/obj. | |
| 2038 src_uri_path_sans_final_dir = _GetPathBeforeFinalDir(src_uri) | |
| 2039 if exp_src_uri.is_cloud_uri(): | |
| 2040 dst_key_name = exp_src_uri.versionless_uri[ | |
| 2041 len(src_uri_path_sans_final_dir):].lstrip(src_uri.delim) | |
| 2042 else: | |
| 2043 dst_key_name = exp_src_uri.uri[ | |
| 2044 len(src_uri_path_sans_final_dir):].lstrip(src_uri.delim) | |
| 2045 # Handle case where dst_uri is a non-existent subdir. | |
| 2046 if not have_existing_dest_subdir: | |
| 2047 dst_key_name = dst_key_name.partition(src_uri.delim)[-1] | |
| 2048 # Handle special case where src_uri was a directory named with '.' or | |
| 2049 # './', so that running a command like: | |
| 2050 # gsutil cp -r . gs://dest | |
| 2051 # will produce obj names of the form gs://dest/abc instead of | |
| 2052 # gs://dest/./abc. | |
| 2053 if dst_key_name.startswith('.%s' % os.sep): | |
| 2054 dst_key_name = dst_key_name[2:] | |
| 2055 | |
| 2056 else: | |
| 2057 # Case 3. | |
| 2058 dst_key_name = exp_src_uri.object_name.rpartition(src_uri.delim)[-1] | |
| 2059 | |
| 2060 if (exp_dst_uri.is_file_uri() | |
| 2061 or self._ShouldTreatDstUriAsBucketSubDir( | |
| 2062 have_multiple_srcs, exp_dst_uri, have_existing_dest_subdir)): | |
| 2063 if exp_dst_uri.object_name.endswith(exp_dst_uri.delim): | |
| 2064 dst_key_name = '%s%s%s' % ( | |
| 2065 exp_dst_uri.object_name.rstrip(exp_dst_uri.delim), | |
| 2066 exp_dst_uri.delim, dst_key_name) | |
| 2067 else: | |
| 2068 delim = exp_dst_uri.delim if exp_dst_uri.object_name else '' | |
| 2069 dst_key_name = '%s%s%s' % (exp_dst_uri.object_name, delim, dst_key_name) | |
| 2070 | |
| 2071 return exp_dst_uri.clone_replace_name(dst_key_name) | |
| 2072 | |
| 2073 def _FixWindowsNaming(self, src_uri, dst_uri): | |
| 2074 """ | |
| 2075 Rewrites the destination URI built by _ConstructDstUri() to translate | |
| 2076 Windows pathnames to cloud pathnames if needed. | |
| 2077 | |
| 2078 Args: | |
| 2079 src_uri: Source URI to be copied. | |
| 2080 dst_uri: The destination URI built by _ConstructDstUri(). | |
| 2081 | |
| 2082 Returns: | |
| 2083 StorageUri to use for copy. | |
| 2084 """ | |
| 2085 if (src_uri.is_file_uri() and src_uri.delim == '\\' | |
| 2086 and dst_uri.is_cloud_uri()): | |
| 2087 trans_uri_str = re.sub(r'\\', '/', dst_uri.uri) | |
| 2088 dst_uri = self.suri_builder.StorageUri(trans_uri_str) | |
| 2089 return dst_uri | |
| 2090 | |
| 2091 def _CopyFunc(self, name_expansion_result): | |
| 2092 """Worker function for performing the actual copy (and rm, for mv).""" | 607 """Worker function for performing the actual copy (and rm, for mv).""" |
| 2093 (exp_dst_uri, have_existing_dst_container) = self._ExpandDstUri( | 608 gsutil_api = GetCloudApiInstance(self, thread_state=thread_state) |
| 2094 self.args[-1]) | 609 |
| 2095 if self.perform_mv: | 610 copy_helper_opts = copy_helper.GetCopyHelperOpts() |
| 611 if copy_helper_opts.perform_mv: |
| 2096 cmd_name = 'mv' | 612 cmd_name = 'mv' |
| 2097 else: | 613 else: |
| 2098 cmd_name = self.command_name | 614 cmd_name = self.command_name |
| 2099 src_uri = self.suri_builder.StorageUri( | 615 src_url = name_expansion_result.source_storage_url |
| 2100 name_expansion_result.GetSrcUriStr()) | 616 exp_src_url = name_expansion_result.expanded_storage_url |
| 2101 exp_src_uri = self.suri_builder.StorageUri( | 617 src_url_names_container = name_expansion_result.names_container |
| 2102 name_expansion_result.GetExpandedUriStr()) | 618 have_multiple_srcs = name_expansion_result.is_multi_source_request |
| 2103 src_uri_names_container = name_expansion_result.NamesContainer() | 619 |
| 2104 src_uri_expands_to_multi = name_expansion_result.NamesContainer() | 620 if src_url.IsCloudUrl() and src_url.IsProvider(): |
| 2105 have_multiple_srcs = name_expansion_result.IsMultiSrcRequest() | |
| 2106 have_existing_dest_subdir = ( | |
| 2107 name_expansion_result.HaveExistingDstContainer()) | |
| 2108 if src_uri.names_provider(): | |
| 2109 raise CommandException( | 621 raise CommandException( |
| 2110 'The %s command does not allow provider-only source URIs (%s)' % | 622 'The %s command does not allow provider-only source URLs (%s)' % |
| 2111 (cmd_name, src_uri)) | 623 (cmd_name, src_url)) |
| 2112 if have_multiple_srcs: | 624 if have_multiple_srcs: |
| 2113 self._InsistDstUriNamesContainer(exp_dst_uri, | 625 copy_helper.InsistDstUrlNamesContainer( |
| 2114 have_existing_dst_container, | 626 self.exp_dst_url, self.have_existing_dst_container, cmd_name) |
| 2115 cmd_name) | 627 |
| 2116 | 628 # Various GUI tools (like the GCS web console) create placeholder objects |
| 2117 | 629 # ending with '/' when the user creates an empty directory. Normally these |
| 2118 if self.use_manifest and self.manifest.WasSuccessful(str(exp_src_uri)): | 630 # tools should delete those placeholders once objects have been written |
| 631 # "under" the directory, but sometimes the placeholders are left around. We |
| 632 # need to filter them out here, otherwise if the user tries to rsync from |
| 633 # GCS to a local directory it will result in a directory/file conflict |
| 634 # (e.g., trying to download an object called "mydata/" where the local |
| 635 # directory "mydata" exists). |
| 636 if IsCloudSubdirPlaceholder(exp_src_url): |
| 637 self.logger.info('Skipping cloud sub-directory placeholder object %s', |
| 638 exp_src_url) |
| 2119 return | 639 return |
| 2120 | 640 |
| 2121 if self.perform_mv: | 641 if copy_helper_opts.use_manifest and self.manifest.WasSuccessful( |
| 2122 if name_expansion_result.NamesContainer(): | 642 exp_src_url.url_string): |
| 643 return |
| 644 |
| 645 if copy_helper_opts.perform_mv: |
| 646 if name_expansion_result.names_container: |
| 2123 # Use recursion_requested when performing name expansion for the | 647 # Use recursion_requested when performing name expansion for the |
| 2124 # directory mv case so we can determine if any of the source URIs are | 648 # directory mv case so we can determine if any of the source URLs are |
| 2125 # directories (and then use cp -R and rm -R to perform the move, to | 649 # directories (and then use cp -R and rm -R to perform the move, to |
| 2126 # match the behavior of Linux mv (which when moving a directory moves | 650 # match the behavior of Linux mv (which when moving a directory moves |
| 2127 # all the contained files). | 651 # all the contained files). |
| 2128 self.recursion_requested = True | 652 self.recursion_requested = True |
| 2129 # Disallow wildcard src URIs when moving directories, as supporting it | 653 # Disallow wildcard src URLs when moving directories, as supporting it |
| 2130 # would make the name transformation too complex and would also be | 654 # would make the name transformation too complex and would also be |
| 2131 # dangerous (e.g., someone could accidentally move many objects to the | 655 # dangerous (e.g., someone could accidentally move many objects to the |
| 2132 # wrong name, or accidentally overwrite many objects). | 656 # wrong name, or accidentally overwrite many objects). |
| 2133 if ContainsWildcard(src_uri): | 657 if ContainsWildcard(src_url.url_string): |
| 2134 raise CommandException('The mv command disallows naming source ' | 658 raise CommandException('The mv command disallows naming source ' |
| 2135 'directories using wildcards') | 659 'directories using wildcards') |
| 2136 | 660 |
| 2137 if (exp_dst_uri.is_file_uri() | 661 if (self.exp_dst_url.IsFileUrl() |
| 2138 and not os.path.exists(exp_dst_uri.object_name) | 662 and not os.path.exists(self.exp_dst_url.object_name) |
| 2139 and have_multiple_srcs): | 663 and have_multiple_srcs): |
| 2140 os.makedirs(exp_dst_uri.object_name) | 664 os.makedirs(self.exp_dst_url.object_name) |
| 2141 | 665 |
| 2142 dst_uri = self._ConstructDstUri(src_uri, exp_src_uri, | 666 dst_url = copy_helper.ConstructDstUrl( |
| 2143 src_uri_names_container, | 667 src_url, exp_src_url, src_url_names_container, have_multiple_srcs, |
| 2144 src_uri_expands_to_multi, | 668 self.exp_dst_url, self.have_existing_dst_container, |
| 2145 have_multiple_srcs, exp_dst_uri, | 669 self.recursion_requested) |
| 2146 have_existing_dest_subdir) | 670 dst_url = copy_helper.FixWindowsNaming(src_url, dst_url) |
| 2147 dst_uri = self._FixWindowsNaming(src_uri, dst_uri) | 671 |
| 2148 | 672 copy_helper.CheckForDirFileConflict(exp_src_url, dst_url) |
| 2149 self._CheckForDirFileConflict(exp_src_uri, dst_uri) | 673 if copy_helper.SrcDstSame(exp_src_url, dst_url): |
| 2150 if self._SrcDstSame(exp_src_uri, dst_uri): | |
| 2151 raise CommandException('%s: "%s" and "%s" are the same file - ' | 674 raise CommandException('%s: "%s" and "%s" are the same file - ' |
| 2152 'abort.' % (cmd_name, exp_src_uri, dst_uri)) | 675 'abort.' % (cmd_name, exp_src_url, dst_url)) |
| 2153 | 676 |
| 2154 if dst_uri.is_cloud_uri() and dst_uri.is_version_specific: | 677 if dst_url.IsCloudUrl() and dst_url.HasGeneration(): |
| 2155 raise CommandException('%s: a version-specific URI\n(%s)\ncannot be ' | 678 raise CommandException('%s: a version-specific URL\n(%s)\ncannot be ' |
| 2156 'the destination for gsutil cp - abort.' | 679 'the destination for gsutil cp - abort.' |
| 2157 % (cmd_name, dst_uri)) | 680 % (cmd_name, dst_url)) |
| 2158 | 681 |
| 2159 elapsed_time = bytes_transferred = 0 | 682 elapsed_time = bytes_transferred = 0 |
| 2160 try: | 683 try: |
| 2161 if self.use_manifest: | 684 if copy_helper_opts.use_manifest: |
| 2162 self.manifest.Initialize(exp_src_uri, dst_uri) | 685 self.manifest.Initialize( |
| 2163 (elapsed_time, bytes_transferred, result_uri) = ( | 686 exp_src_url.url_string, dst_url.url_string) |
| 2164 self._PerformCopy(exp_src_uri, dst_uri)) | 687 (elapsed_time, bytes_transferred, result_url, md5) = ( |
| 2165 if self.use_manifest: | 688 copy_helper.PerformCopy( |
| 2166 if hasattr(dst_uri, 'md5'): | 689 self.logger, exp_src_url, dst_url, gsutil_api, |
| 2167 self.manifest.Set(exp_src_uri, 'md5', dst_uri.md5) | 690 self, _CopyExceptionHandler, allow_splitting=True, |
| 2168 self.manifest.SetResult(exp_src_uri, bytes_transferred, 'OK') | 691 headers=self.headers, manifest=self.manifest, |
| 692 gzip_exts=self.gzip_exts, test_method=self.test_method)) |
| 693 if copy_helper_opts.use_manifest: |
| 694 if md5: |
| 695 self.manifest.Set(exp_src_url.url_string, 'md5', md5) |
| 696 self.manifest.SetResult( |
| 697 exp_src_url.url_string, bytes_transferred, 'OK') |
| 698 if copy_helper_opts.print_ver: |
| 699 # Some cases don't return a version-specific URL (e.g., if destination |
| 700 # is a file). |
| 701 self.logger.info('Created: %s', result_url) |
| 2169 except ItemExistsError: | 702 except ItemExistsError: |
| 2170 message = 'Skipping existing item: %s' % dst_uri.uri | 703 message = 'Skipping existing item: %s' % dst_url |
| 2171 self.logger.info(message) | 704 self.logger.info(message) |
| 2172 if self.use_manifest: | 705 if copy_helper_opts.use_manifest: |
| 2173 self.manifest.SetResult(exp_src_uri, 0, 'skip', message) | 706 self.manifest.SetResult(exp_src_url.url_string, 0, 'skip', message) |
| 2174 except Exception, e: | 707 except Exception, e: |
| 2175 if self._IsNoClobberServerException(e): | 708 if (copy_helper_opts.no_clobber and |
| 2176 message = 'Rejected (noclobber): %s' % dst_uri.uri | 709 copy_helper.IsNoClobberServerException(e)): |
| 710 message = 'Rejected (noclobber): %s' % dst_url |
| 2177 self.logger.info(message) | 711 self.logger.info(message) |
| 2178 if self.use_manifest: | 712 if copy_helper_opts.use_manifest: |
| 2179 self.manifest.SetResult(exp_src_uri, 0, 'skip', message) | 713 self.manifest.SetResult( |
| 714 exp_src_url.url_string, 0, 'skip', message) |
| 2180 elif self.continue_on_error: | 715 elif self.continue_on_error: |
| 2181 message = 'Error copying %s: %s' % (src_uri.uri, str(e)) | 716 message = 'Error copying %s: %s' % (src_url, str(e)) |
| 2182 self.copy_failure_count += 1 | 717 self.op_failure_count += 1 |
| 2183 self.logger.error(message) | 718 self.logger.error(message) |
| 2184 if self.use_manifest: | 719 if copy_helper_opts.use_manifest: |
| 2185 self.manifest.SetResult(exp_src_uri, 0, 'error', message) | 720 self.manifest.SetResult( |
| 721 exp_src_url.url_string, 0, 'error', |
| 722 RemoveCRLFFromString(message)) |
| 2186 else: | 723 else: |
| 2187 if self.use_manifest: | 724 if copy_helper_opts.use_manifest: |
| 2188 self.manifest.SetResult(exp_src_uri, 0, 'error', str(e)) | 725 self.manifest.SetResult( |
| 726 exp_src_url.url_string, 0, 'error', str(e)) |
| 2189 raise | 727 raise |
| 2190 | 728 |
| 2191 if self.print_ver: | |
| 2192 # Some cases don't return a version-specific URI (e.g., if destination | |
| 2193 # is a file). | |
| 2194 if hasattr(result_uri, 'version_specific_uri'): | |
| 2195 self.logger.info('Created: %s' % result_uri.version_specific_uri) | |
| 2196 else: | |
| 2197 self.logger.info('Created: %s' % result_uri.uri) | |
| 2198 | |
| 2199 # TODO: If we ever use -n (noclobber) with -M (move) (not possible today | 729 # TODO: If we ever use -n (noclobber) with -M (move) (not possible today |
| 2200 # since we call copy internally from move and don't specify the -n flag) | 730 # since we call copy internally from move and don't specify the -n flag) |
| 2201 # we'll need to only remove the source when we have not skipped the | 731 # we'll need to only remove the source when we have not skipped the |
| 2202 # destination. | 732 # destination. |
| 2203 if self.perform_mv: | 733 if copy_helper_opts.perform_mv: |
| 2204 self.logger.info('Removing %s...', exp_src_uri) | 734 self.logger.info('Removing %s...', exp_src_url) |
| 2205 exp_src_uri.delete_key(validate=False, headers=self.headers) | 735 if exp_src_url.IsCloudUrl(): |
| 736 gsutil_api.DeleteObject(exp_src_url.bucket_name, |
| 737 exp_src_url.object_name, |
| 738 generation=exp_src_url.generation, |
| 739 provider=exp_src_url.scheme) |
| 740 else: |
| 741 os.unlink(exp_src_url.object_name) |
| 742 |
| 2206 with self.stats_lock: | 743 with self.stats_lock: |
| 2207 self.total_elapsed_time += elapsed_time | 744 self.total_elapsed_time += elapsed_time |
| 2208 self.total_bytes_transferred += bytes_transferred | 745 self.total_bytes_transferred += bytes_transferred |
| 2209 | 746 |
| 2210 # Command entry point. | 747 # Command entry point. |
| 2211 def RunCommand(self): | 748 def RunCommand(self): |
| 2212 self._ParseArgs() | 749 copy_helper_opts = self._ParseOpts() |
| 2213 | 750 |
| 2214 self.total_elapsed_time = self.total_bytes_transferred = 0 | 751 self.total_elapsed_time = self.total_bytes_transferred = 0 |
| 2215 if self.args[-1] == '-' or self.args[-1] == 'file://-': | 752 if self.args[-1] == '-' or self.args[-1] == 'file://-': |
| 2216 self._HandleStreamingDownload() | 753 return CatHelper(self).CatUrlStrings(self.args[:-1]) |
| 2217 return 0 | 754 |
| 2218 | 755 if copy_helper_opts.read_args_from_stdin: |
| 2219 if self.read_args_from_stdin: | |
| 2220 if len(self.args) != 1: | 756 if len(self.args) != 1: |
| 2221 raise CommandException('Source URIs cannot be specified with -I option') | 757 raise CommandException('Source URLs cannot be specified with -I option') |
| 2222 uri_strs = self._StdinIterator() | 758 url_strs = copy_helper.StdinIterator() |
| 2223 else: | 759 else: |
| 2224 if len(self.args) < 2: | 760 if len(self.args) < 2: |
| 2225 raise CommandException('Wrong number of arguments for "cp" command.') | 761 raise CommandException('Wrong number of arguments for "cp" command.') |
| 2226 uri_strs = self.args[:-1] | 762 url_strs = self.args[:-1] |
| 2227 | 763 |
| 2228 (exp_dst_uri, have_existing_dst_container) = self._ExpandDstUri( | 764 (self.exp_dst_url, self.have_existing_dst_container) = ( |
| 2229 self.args[-1]) | 765 copy_helper.ExpandUrlToSingleBlr(self.args[-1], self.gsutil_api, |
| 766 self.debug, self.project_id)) |
| 767 |
| 2230 # If the destination bucket has versioning enabled iterate with | 768 # If the destination bucket has versioning enabled iterate with |
| 2231 # all_versions=True. That way we'll copy all versions if the source bucket | 769 # all_versions=True. That way we'll copy all versions if the source bucket |
| 2232 # is versioned; and by leaving all_versions=False if the destination bucket | 770 # is versioned; and by leaving all_versions=False if the destination bucket |
| 2233 # has versioning disabled we will avoid copying old versions all to the same | 771 # has versioning disabled we will avoid copying old versions all to the same |
| 2234 # un-versioned destination object. | 772 # un-versioned destination object. |
| 773 all_versions = False |
| 2235 try: | 774 try: |
| 2236 all_versions = (exp_dst_uri.names_bucket() | 775 bucket = self._GetBucketWithVersioningConfig(self.exp_dst_url) |
| 2237 and exp_dst_uri.get_versioning_config(self.headers)) | 776 if bucket and bucket.versioning and bucket.versioning.enabled: |
| 2238 except GSResponseError as e: | |
| 2239 # This happens if the user doesn't have OWNER access on the bucket (needed | |
| 2240 # to check if versioning is enabled). In this case fall back to copying | |
| 2241 # all versions (which can be inefficient for the reason noted in the | |
| 2242 # comment above). We don't try to warn the user because that would result | |
| 2243 # in false positive warnings (since we can't check if versioning is | |
| 2244 # enabled on the destination bucket). | |
| 2245 if e.status == 403: | |
| 2246 all_versions = True | 777 all_versions = True |
| 2247 else: | 778 except AccessDeniedException: |
| 2248 raise | 779 # This happens (in the XML API only) if the user doesn't have OWNER access |
| 780 # on the bucket (needed to check if versioning is enabled). In this case |
| 781 # fall back to copying all versions (which can be inefficient for the |
| 782 # reason noted in the comment above). We don't try to warn the user |
| 783 # because that would result in false positive warnings (since we can't |
| 784 # check if versioning is enabled on the destination bucket). |
| 785 # |
| 786 # For JSON, we will silently not return versioning if we don't have |
| 787 # access. |
| 788 all_versions = True |
| 789 |
| 2249 name_expansion_iterator = NameExpansionIterator( | 790 name_expansion_iterator = NameExpansionIterator( |
| 2250 self.command_name, self.proj_id_handler, self.headers, self.debug, | 791 self.command_name, self.debug, |
| 2251 self.logger, self.bucket_storage_uri_class, uri_strs, | 792 self.logger, self.gsutil_api, url_strs, |
| 2252 self.recursion_requested or self.perform_mv, | 793 self.recursion_requested or copy_helper_opts.perform_mv, |
| 2253 have_existing_dst_container=have_existing_dst_container, | 794 project_id=self.project_id, all_versions=all_versions, |
| 2254 all_versions=all_versions) | 795 continue_on_error=self.continue_on_error or self.parallel_operations) |
| 2255 self.have_existing_dst_container = have_existing_dst_container | |
| 2256 | 796 |
| 2257 # Use a lock to ensure accurate statistics in the face of | 797 # Use a lock to ensure accurate statistics in the face of |
| 2258 # multi-threading/multi-processing. | 798 # multi-threading/multi-processing. |
| 2259 self.stats_lock = CreateLock() | 799 self.stats_lock = CreateLock() |
| 2260 | 800 |
| 2261 # Tracks if any copies failed. | 801 # Tracks if any copies failed. |
| 2262 self.copy_failure_count = 0 | 802 self.op_failure_count = 0 |
| 2263 | 803 |
| 2264 # Start the clock. | 804 # Start the clock. |
| 2265 start_time = time.time() | 805 start_time = time.time() |
| 2266 | 806 |
| 2267 # Tuple of attributes to share/manage across multiple processes in | 807 # Tuple of attributes to share/manage across multiple processes in |
| 2268 # parallel (-m) mode. | 808 # parallel (-m) mode. |
| 2269 shared_attrs = ('copy_failure_count', 'total_bytes_transferred') | 809 shared_attrs = ('op_failure_count', 'total_bytes_transferred') |
| 2270 | 810 |
| 2271 # Perform copy requests in parallel (-m) mode, if requested, using | 811 # Perform copy requests in parallel (-m) mode, if requested, using |
| 2272 # configured number of parallel processes and threads. Otherwise, | 812 # configured number of parallel processes and threads. Otherwise, |
| 2273 # perform requests with sequential function calls in current process. | 813 # perform requests with sequential function calls in current process. |
| 2274 self.Apply(_CopyFuncWrapper, name_expansion_iterator, | 814 self.Apply(_CopyFuncWrapper, name_expansion_iterator, |
| 2275 _CopyExceptionHandler, shared_attrs, fail_on_error=True) | 815 _CopyExceptionHandler, shared_attrs, |
| 816 fail_on_error=(not self.continue_on_error)) |
| 2276 self.logger.debug( | 817 self.logger.debug( |
| 2277 'total_bytes_transferred: %d', self.total_bytes_transferred) | 818 'total_bytes_transferred: %d', self.total_bytes_transferred) |
| 2278 | 819 |
| 2279 end_time = time.time() | 820 end_time = time.time() |
| 2280 self.total_elapsed_time = end_time - start_time | 821 self.total_elapsed_time = end_time - start_time |
| 2281 | 822 |
| 2282 # Sometimes, particularly when running unit tests, the total elapsed time | 823 # Sometimes, particularly when running unit tests, the total elapsed time |
| 2283 # is really small. On Windows, the timer resolution is too small and | 824 # is really small. On Windows, the timer resolution is too small and |
| 2284 # causes total_elapsed_time to be zero. | 825 # causes total_elapsed_time to be zero. |
| 2285 try: | 826 try: |
| 2286 float(self.total_bytes_transferred) / float(self.total_elapsed_time) | 827 float(self.total_bytes_transferred) / float(self.total_elapsed_time) |
| 2287 except ZeroDivisionError: | 828 except ZeroDivisionError: |
| 2288 self.total_elapsed_time = 0.01 | 829 self.total_elapsed_time = 0.01 |
| 2289 | 830 |
| 2290 self.total_bytes_per_second = (float(self.total_bytes_transferred) / | 831 self.total_bytes_per_second = (float(self.total_bytes_transferred) / |
| 2291 float(self.total_elapsed_time)) | 832 float(self.total_elapsed_time)) |
| 2292 | 833 |
| 2293 if self.debug == 3: | 834 if self.debug == 3: |
| 2294 # Note that this only counts the actual GET and PUT bytes for the copy | 835 # Note that this only counts the actual GET and PUT bytes for the copy |
| 2295 # - not any transfers for doing wildcard expansion, the initial HEAD | 836 # - not any transfers for doing wildcard expansion, the initial |
| 2296 # request boto performs when doing a bucket.get_key() operation, etc. | 837 # HEAD/GET request performed to get the object metadata, etc. |
| 2297 if self.total_bytes_transferred != 0: | 838 if self.total_bytes_transferred != 0: |
| 2298 self.logger.info( | 839 self.logger.info( |
| 2299 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)', | 840 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)', |
| 2300 self.total_bytes_transferred, self.total_elapsed_time, | 841 self.total_bytes_transferred, self.total_elapsed_time, |
| 2301 MakeHumanReadable(self.total_bytes_per_second)) | 842 MakeHumanReadable(self.total_bytes_per_second)) |
| 2302 if self.copy_failure_count: | 843 if self.op_failure_count: |
| 2303 plural_str = '' | 844 plural_str = 's' if self.op_failure_count else '' |
| 2304 if self.copy_failure_count > 1: | |
| 2305 plural_str = 's' | |
| 2306 raise CommandException('%d file%s/object%s could not be transferred.' % ( | 845 raise CommandException('%d file%s/object%s could not be transferred.' % ( |
| 2307 self.copy_failure_count, plural_str, plural_str)) | 846 self.op_failure_count, plural_str, plural_str)) |
| 2308 | 847 |
| 2309 return 0 | 848 return 0 |
| 2310 | 849 |
| 2311 def _ParseArgs(self): | 850 def _ParseOpts(self): |
| 2312 self.perform_mv = False | 851 perform_mv = False |
| 852 # exclude_symlinks is handled by Command parent class, so save in Command |
| 853 # state rather than CopyHelperOpts. |
| 2313 self.exclude_symlinks = False | 854 self.exclude_symlinks = False |
| 2314 self.no_clobber = False | 855 no_clobber = False |
| 856 # continue_on_error is handled by Command parent class, so save in Command |
| 857 # state rather than CopyHelperOpts. |
| 2315 self.continue_on_error = False | 858 self.continue_on_error = False |
| 2316 self.daisy_chain = False | 859 daisy_chain = False |
| 2317 self.read_args_from_stdin = False | 860 read_args_from_stdin = False |
| 2318 self.print_ver = False | 861 print_ver = False |
| 2319 self.use_manifest = False | 862 use_manifest = False |
| 863 preserve_acl = False |
| 864 canned_acl = None |
| 865 # canned_acl is handled by a helper function in parent |
| 866 # Command class, so save in Command state rather than CopyHelperOpts. |
| 867 self.canned = None |
| 868 |
| 869 # Files matching these extensions should be gzipped before uploading. |
| 870 self.gzip_exts = [] |
| 871 |
| 872 # Test hook for stopping transfers. |
| 873 halt_at_byte = None |
| 2320 | 874 |
| 2321 # self.recursion_requested initialized in command.py (so can be checked | 875 # self.recursion_requested initialized in command.py (so can be checked |
| 2322 # in parent class for all commands). | 876 # in parent class for all commands). |
| 877 self.manifest = None |
| 2323 if self.sub_opts: | 878 if self.sub_opts: |
| 2324 for o, a in self.sub_opts: | 879 for o, a in self.sub_opts: |
| 880 if o == '-a': |
| 881 canned_acl = a |
| 882 self.canned = True |
| 2325 if o == '-c': | 883 if o == '-c': |
| 2326 self.continue_on_error = True | 884 self.continue_on_error = True |
| 2327 elif o == '-D': | 885 elif o == '-D': |
| 2328 self.daisy_chain = True | 886 daisy_chain = True |
| 2329 elif o == '-e': | 887 elif o == '-e': |
| 2330 self.exclude_symlinks = True | 888 self.exclude_symlinks = True |
| 889 elif o == '--haltatbyte': |
| 890 halt_at_byte = long(a) |
| 2331 elif o == '-I': | 891 elif o == '-I': |
| 2332 self.read_args_from_stdin = True | 892 read_args_from_stdin = True |
| 2333 elif o == '-L': | 893 elif o == '-L': |
| 2334 self.use_manifest = True | 894 use_manifest = True |
| 2335 self.manifest = _Manifest(a) | 895 self.manifest = Manifest(a) |
| 2336 elif o == '-M': | 896 elif o == '-M': |
| 2337 # Note that we signal to the cp command to perform a move (copy | 897 # Note that we signal to the cp command to perform a move (copy |
| 2338 # followed by remove) and use directory-move naming rules by passing | 898 # followed by remove) and use directory-move naming rules by passing |
| 2339 # the undocumented (for internal use) -M option when running the cp | 899 # the undocumented (for internal use) -M option when running the cp |
| 2340 # command from mv.py. | 900 # command from mv.py. |
| 2341 self.perform_mv = True | 901 perform_mv = True |
| 2342 elif o == '-n': | 902 elif o == '-n': |
| 2343 self.no_clobber = True | 903 no_clobber = True |
| 2344 elif o == '-q': | 904 elif o == '-p': |
| 2345 self.logger.warning( | 905 preserve_acl = True |
| 2346 'Warning: gsutil cp -q is deprecated, and will be removed in the ' | |
| 2347 'future.\nPlease use gsutil -q cp ... instead.') | |
| 2348 self.logger.setLevel(level=logging.WARNING) | |
| 2349 elif o == '-r' or o == '-R': | 906 elif o == '-r' or o == '-R': |
| 2350 self.recursion_requested = True | 907 self.recursion_requested = True |
| 2351 elif o == '-v': | 908 elif o == '-v': |
| 2352 self.print_ver = True | 909 print_ver = True |
| 910 elif o == '-z': |
| 911 self.gzip_exts = [x.strip() for x in a.split(',')] |
| 912 if preserve_acl and canned_acl: |
| 913 raise CommandException( |
| 914 'Specifying both the -p and -a options together is invalid.') |
| 915 return CreateCopyHelperOpts( |
| 916 perform_mv=perform_mv, |
| 917 no_clobber=no_clobber, |
| 918 daisy_chain=daisy_chain, |
| 919 read_args_from_stdin=read_args_from_stdin, |
| 920 print_ver=print_ver, |
| 921 use_manifest=use_manifest, |
| 922 preserve_acl=preserve_acl, |
| 923 canned_acl=canned_acl, |
| 924 halt_at_byte=halt_at_byte) |
| 2353 | 925 |
| 2354 def _HandleStreamingDownload(self): | 926 def _GetBucketWithVersioningConfig(self, exp_dst_url): |
| 2355 # Destination is <STDOUT>. Manipulate sys.stdout so as to redirect all | 927 """Gets versioning config for a bucket and ensures that it exists. |
| 2356 # debug messages to <STDERR>. | |
| 2357 stdout_fp = sys.stdout | |
| 2358 sys.stdout = sys.stderr | |
| 2359 did_some_work = False | |
| 2360 for uri_str in self.args[0:len(self.args)-1]: | |
| 2361 for uri in self.WildcardIterator(uri_str).IterUris(): | |
| 2362 did_some_work = True | |
| 2363 key = uri.get_key(False, self.headers) | |
| 2364 (elapsed_time, bytes_transferred) = self._PerformDownloadToStream( | |
| 2365 key, uri, stdout_fp, self.headers) | |
| 2366 self.total_elapsed_time += elapsed_time | |
| 2367 self.total_bytes_transferred += bytes_transferred | |
| 2368 if not did_some_work: | |
| 2369 raise CommandException('No URIs matched') | |
| 2370 if self.debug == 3: | |
| 2371 if self.total_bytes_transferred != 0: | |
| 2372 self.logger.info( | |
| 2373 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)', | |
| 2374 self.total_bytes_transferred, self.total_elapsed_time, | |
| 2375 MakeHumanReadable(float(self.total_bytes_transferred) / | |
| 2376 float(self.total_elapsed_time))) | |
| 2377 | |
| 2378 def _StdinIterator(self): | |
| 2379 """A generator function that returns lines from stdin.""" | |
| 2380 for line in sys.stdin: | |
| 2381 # Strip CRLF. | |
| 2382 yield line.rstrip() | |
| 2383 | |
| 2384 def _SrcDstSame(self, src_uri, dst_uri): | |
| 2385 """Checks if src_uri and dst_uri represent the same object or file. | |
| 2386 | |
| 2387 We don't handle anything about hard or symbolic links. | |
| 2388 | 928 |
| 2389 Args: | 929 Args: |
| 2390 src_uri: Source StorageUri. | 930 exp_dst_url: Wildcard-expanded destination StorageUrl. |
| 2391 dst_uri: Destination StorageUri. | 931 |
| 932 Raises: |
| 933 AccessDeniedException: if there was a permissions problem accessing the |
| 934 bucket or its versioning config. |
| 935 CommandException: if URL refers to a cloud bucket that does not exist. |
| 2392 | 936 |
| 2393 Returns: | 937 Returns: |
| 2394 Bool indicator. | 938 apitools Bucket with versioning configuration. |
| 2395 """ | 939 """ |
| 2396 if src_uri.is_file_uri() and dst_uri.is_file_uri(): | 940 bucket = None |
| 2397 # Translate a/b/./c to a/b/c, so src=dst comparison below works. | 941 if exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket(): |
| 2398 new_src_path = os.path.normpath(src_uri.object_name) | 942 try: |
| 2399 new_dst_path = os.path.normpath(dst_uri.object_name) | 943 bucket = self.gsutil_api.GetBucket( |
| 2400 return (src_uri.clone_replace_name(new_src_path).uri == | 944 exp_dst_url.bucket_name, provider=exp_dst_url.scheme, |
| 2401 dst_uri.clone_replace_name(new_dst_path).uri) | 945 fields=['versioning']) |
| 2402 else: | 946 except AccessDeniedException, e: |
| 2403 return (src_uri.uri == dst_uri.uri and | 947 raise |
| 2404 src_uri.generation == dst_uri.generation and | 948 except NotFoundException, e: |
| 2405 src_uri.version_id == dst_uri.version_id) | 949 raise CommandException('Destination bucket %s does not exist.' % |
| 2406 | 950 exp_dst_url) |
| 2407 def _ShouldTreatDstUriAsBucketSubDir(self, have_multiple_srcs, dst_uri, | 951 except Exception, e: |
| 2408 have_existing_dest_subdir): | 952 raise CommandException('Error retrieving destination bucket %s: %s' % |
| 2409 """ | 953 (exp_dst_url, e.message)) |
| 2410 Checks whether dst_uri should be treated as a bucket "sub-directory". The | 954 return bucket |
| 2411 decision about whether something constitutes a bucket "sub-directory" | |
| 2412 depends on whether there are multiple sources in this request and whether | |
| 2413 there is an existing bucket subdirectory. For example, when running the | |
| 2414 command: | |
| 2415 gsutil cp file gs://bucket/abc | |
| 2416 if there's no existing gs://bucket/abc bucket subdirectory we should copy | |
| 2417 file to the object gs://bucket/abc. In contrast, if | |
| 2418 there's an existing gs://bucket/abc bucket subdirectory we should copy | |
| 2419 file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc | |
| 2420 exists, when running the command: | |
| 2421 gsutil cp file1 file2 gs://bucket/abc | |
| 2422 we should copy file1 to gs://bucket/abc/file1 (and similarly for file2). | |
| 2423 | |
| 2424 Note that we don't disallow naming a bucket "sub-directory" where there's | |
| 2425 already an object at that URI. For example it's legitimate (albeit | |
| 2426 confusing) to have an object called gs://bucket/dir and | |
| 2427 then run the command | |
| 2428 gsutil cp file1 file2 gs://bucket/dir | |
| 2429 Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1, | |
| 2430 and gs://bucket/dir/file2. | |
| 2431 | |
| 2432 Args: | |
| 2433 have_multiple_srcs: Bool indicator of whether this is a multi-source | |
| 2434 operation. | |
| 2435 dst_uri: StorageUri to check. | |
| 2436 have_existing_dest_subdir: bool indicator whether dest is an existing | |
| 2437 subdirectory. | |
| 2438 | |
| 2439 Returns: | |
| 2440 bool indicator. | |
| 2441 """ | |
| 2442 return ((have_multiple_srcs and dst_uri.is_cloud_uri()) | |
| 2443 or (have_existing_dest_subdir)) | |
| 2444 | |
| 2445 def _ShouldTreatDstUriAsSingleton(self, have_multiple_srcs, | |
| 2446 have_existing_dest_subdir, dst_uri): | |
| 2447 """ | |
| 2448 Checks that dst_uri names a singleton (file or object) after | |
| 2449 dir/wildcard expansion. The decision is more nuanced than simply | |
| 2450 dst_uri.names_singleton()) because of the possibility that an object path | |
| 2451 might name a bucket sub-directory. | |
| 2452 | |
| 2453 Args: | |
| 2454 have_multiple_srcs: Bool indicator of whether this is a multi-source | |
| 2455 operation. | |
| 2456 have_existing_dest_subdir: bool indicator whether dest is an existing | |
| 2457 subdirectory. | |
| 2458 dst_uri: StorageUri to check. | |
| 2459 | |
| 2460 Returns: | |
| 2461 bool indicator. | |
| 2462 """ | |
| 2463 if have_multiple_srcs: | |
| 2464 # Only a file meets the criteria in this case. | |
| 2465 return dst_uri.names_file() | |
| 2466 return not have_existing_dest_subdir and dst_uri.names_singleton() | |
| 2467 | |
| 2468 def _IsNoClobberServerException(self, e): | |
| 2469 """ | |
| 2470 Checks to see if the server attempted to clobber a file after we specified | |
| 2471 in the header that we didn't want the file clobbered. | |
| 2472 | |
| 2473 Args: | |
| 2474 e: The Exception that was generated by a failed copy operation | |
| 2475 | |
| 2476 Returns: | |
| 2477 bool indicator - True indicates that the server did attempt to clobber | |
| 2478 an existing file. | |
| 2479 """ | |
| 2480 return self.no_clobber and ( | |
| 2481 (isinstance(e, GSResponseError) and e.status==412) or | |
| 2482 (isinstance(e, ResumableUploadException) and 'code 412' in e.message)) | |
| 2483 | |
| 2484 | |
| 2485 class _Manifest(object): | |
| 2486 """Stores the manifest items for the CpCommand class.""" | |
| 2487 | |
| 2488 def __init__(self, path): | |
| 2489 # self.items contains a dictionary of rows | |
| 2490 self.items = {} | |
| 2491 self.manifest_filter = {} | |
| 2492 self.lock = multiprocessing.Manager().Lock() | |
| 2493 | |
| 2494 self.manifest_path = os.path.expanduser(path) | |
| 2495 self._ParseManifest() | |
| 2496 self._CreateManifestFile() | |
| 2497 | |
| 2498 def _ParseManifest(self): | |
| 2499 """ | |
| 2500 Load and parse a manifest file. This information will be used to skip | |
| 2501 any files that have a skip or OK status. | |
| 2502 """ | |
| 2503 try: | |
| 2504 if os.path.exists(self.manifest_path): | |
| 2505 with open(self.manifest_path, 'rb') as f: | |
| 2506 first_row = True | |
| 2507 reader = csv.reader(f) | |
| 2508 for row in reader: | |
| 2509 if first_row: | |
| 2510 try: | |
| 2511 source_index = row.index('Source') | |
| 2512 result_index = row.index('Result') | |
| 2513 except ValueError: | |
| 2514 # No header and thus not a valid manifest file. | |
| 2515 raise CommandException( | |
| 2516 'Missing headers in manifest file: %s' % self.manifest_path) | |
| 2517 first_row = False | |
| 2518 source = row[source_index] | |
| 2519 result = row[result_index] | |
| 2520 if result in ['OK', 'skip']: | |
| 2521 # We're always guaranteed to take the last result of a specific | |
| 2522 # source uri. | |
| 2523 self.manifest_filter[source] = result | |
| 2524 except IOError as ex: | |
| 2525 raise CommandException('Could not parse %s' % path) | |
| 2526 | |
| 2527 def WasSuccessful(self, src): | |
| 2528 """ Returns whether the specified src uri was marked as successful.""" | |
| 2529 return src in self.manifest_filter | |
| 2530 | |
| 2531 def _CreateManifestFile(self): | |
| 2532 """Opens the manifest file and assigns it to the file pointer.""" | |
| 2533 try: | |
| 2534 if ((not os.path.exists(self.manifest_path)) | |
| 2535 or (os.stat(self.manifest_path).st_size == 0)): | |
| 2536 # Add headers to the new file. | |
| 2537 with open(self.manifest_path, 'wb', 1) as f: | |
| 2538 writer = csv.writer(f) | |
| 2539 writer.writerow(['Source', | |
| 2540 'Destination', | |
| 2541 'Start', | |
| 2542 'End', | |
| 2543 'Md5', | |
| 2544 'UploadId', | |
| 2545 'Source Size', | |
| 2546 'Bytes Transferred', | |
| 2547 'Result', | |
| 2548 'Description']) | |
| 2549 except IOError: | |
| 2550 raise CommandException('Could not create manifest file.') | |
| 2551 | |
| 2552 def Set(self, uri, key, value): | |
| 2553 if value is None: | |
| 2554 # In case we don't have any information to set we bail out here. | |
| 2555 # This is so that we don't clobber existing information. | |
| 2556 # To zero information pass '' instead of None. | |
| 2557 return | |
| 2558 if uri in self.items: | |
| 2559 self.items[uri][key] = value | |
| 2560 else: | |
| 2561 self.items[uri] = {key:value} | |
| 2562 | |
| 2563 def Initialize(self, source_uri, destination_uri): | |
| 2564 # Always use the source_uri as the key for the item. This is unique. | |
| 2565 self.Set(source_uri, 'source_uri', source_uri) | |
| 2566 self.Set(source_uri, 'destination_uri', destination_uri) | |
| 2567 self.Set(source_uri, 'start_time', datetime.datetime.utcnow()) | |
| 2568 | |
| 2569 def SetResult(self, source_uri, bytes_transferred, result, | |
| 2570 description=''): | |
| 2571 self.Set(source_uri, 'bytes', bytes_transferred) | |
| 2572 self.Set(source_uri, 'result', result) | |
| 2573 self.Set(source_uri, 'description', description) | |
| 2574 self.Set(source_uri, 'end_time', datetime.datetime.utcnow()) | |
| 2575 self._WriteRowToManifestFile(source_uri) | |
| 2576 self._RemoveItemFromManifest(source_uri) | |
| 2577 | |
| 2578 def _WriteRowToManifestFile(self, uri): | |
| 2579 row_item = self.items[uri] | |
| 2580 data = [ | |
| 2581 str(row_item['source_uri']), | |
| 2582 str(row_item['destination_uri']), | |
| 2583 '%sZ' % row_item['start_time'].isoformat(), | |
| 2584 '%sZ' % row_item['end_time'].isoformat(), | |
| 2585 row_item['md5'] if 'md5' in row_item else '', | |
| 2586 row_item['upload_id'] if 'upload_id' in row_item else '', | |
| 2587 str(row_item['size']) if 'size' in row_item else '', | |
| 2588 str(row_item['bytes']) if 'bytes' in row_item else '', | |
| 2589 row_item['result'], | |
| 2590 row_item['description']] | |
| 2591 | |
| 2592 # Aquire a lock to prevent multiple threads writing to the same file at | |
| 2593 # the same time. This would cause a garbled mess in the manifest file. | |
| 2594 with self.lock: | |
| 2595 with open(self.manifest_path, 'a', 1) as f: # 1 == line buffered | |
| 2596 writer = csv.writer(f) | |
| 2597 writer.writerow(data) | |
| 2598 | |
| 2599 def _RemoveItemFromManifest(self, uri): | |
| 2600 # Remove the item from the dictionary since we're done with it and | |
| 2601 # we don't want the dictionary to grow too large in memory for no good | |
| 2602 # reason. | |
| 2603 del self.items[uri] | |
| 2604 | |
| 2605 | |
| 2606 class ItemExistsError(Exception): | |
| 2607 """Exception class for objects that are skipped because they already exist.""" | |
| 2608 pass | |
| 2609 | |
| 2610 | |
| 2611 def _GetPathBeforeFinalDir(uri): | |
| 2612 """ | |
| 2613 Returns the part of the path before the final directory component for the | |
| 2614 given URI, handling cases for file system directories, bucket, and bucket | |
| 2615 subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket', | |
| 2616 and for file://dir we'll return file:// | |
| 2617 | |
| 2618 Args: | |
| 2619 uri: StorageUri. | |
| 2620 | |
| 2621 Returns: | |
| 2622 String name of above-described path, sans final path separator. | |
| 2623 """ | |
| 2624 sep = uri.delim | |
| 2625 # If the source uri argument had a wildcard and wasn't expanded by the | |
| 2626 # shell, then uri.names_file() will always be true, so we check for | |
| 2627 # this case explicitly. | |
| 2628 assert ((not uri.names_file()) or ContainsWildcard(uri.object_name)) | |
| 2629 if uri.names_directory(): | |
| 2630 past_scheme = uri.uri[len('file://'):] | |
| 2631 if past_scheme.find(sep) == -1: | |
| 2632 return 'file://' | |
| 2633 else: | |
| 2634 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] | |
| 2635 if uri.names_bucket(): | |
| 2636 return '%s://' % uri.scheme | |
| 2637 # Else it names a bucket subdir. | |
| 2638 return uri.uri.rstrip(sep).rpartition(sep)[0] | |
| 2639 | |
| 2640 def _HashFilename(filename): | |
| 2641 """ | |
| 2642 Apply a hash function (SHA1) to shorten the passed file name. The spec | |
| 2643 for the hashed file name is as follows: | |
| 2644 | |
| 2645 TRACKER_<hash>_<trailing> | |
| 2646 | |
| 2647 where hash is a SHA1 hash on the original file name and trailing is | |
| 2648 the last 16 chars from the original file name. Max file name lengths | |
| 2649 vary by operating system so the goal of this function is to ensure | |
| 2650 the hashed version takes fewer than 100 characters. | |
| 2651 | |
| 2652 Args: | |
| 2653 filename: file name to be hashed. | |
| 2654 | |
| 2655 Returns: | |
| 2656 shorter, hashed version of passed file name | |
| 2657 """ | |
| 2658 if isinstance(filename, unicode): | |
| 2659 filename = filename.encode('utf-8') | |
| 2660 else: | |
| 2661 filename = unicode(filename, 'utf8').encode('utf-8') | |
| 2662 m = hashlib.sha1(filename) | |
| 2663 return "TRACKER_" + m.hexdigest() + '.' + filename[-16:] | |
| 2664 | |
| 2665 def _DivideAndCeil(dividend, divisor): | |
| 2666 """Returns ceil(dividend / divisor), taking care to avoid the pitfalls of | |
| 2667 floating point arithmetic that could otherwise yield the wrong result | |
| 2668 for large numbers. | |
| 2669 """ | |
| 2670 quotient = dividend // divisor | |
| 2671 if (dividend % divisor) != 0: | |
| 2672 quotient += 1 | |
| 2673 return quotient | |
| 2674 | |
| 2675 def _GetPartitionInfo(file_size, max_components, default_component_size): | |
| 2676 """ | |
| 2677 Args: | |
| 2678 file_size: The number of bytes in the file to be partitioned. | |
| 2679 max_components: The maximum number of components that can be composed. | |
| 2680 default_component_size: The size of a component, assuming that | |
| 2681 max_components is infinite. | |
| 2682 Returns: | |
| 2683 The number of components in the partitioned file, and the size of each | |
| 2684 component (except the last, which will have a different size iff | |
| 2685 file_size != 0 (mod num_components)). | |
| 2686 """ | |
| 2687 # num_components = ceil(file_size / default_component_size) | |
| 2688 num_components = _DivideAndCeil(file_size, default_component_size) | |
| 2689 | |
| 2690 # num_components must be in the range [2, max_components] | |
| 2691 num_components = max(min(num_components, max_components), 2) | |
| 2692 | |
| 2693 # component_size = ceil(file_size / num_components) | |
| 2694 component_size = _DivideAndCeil(file_size, num_components) | |
| 2695 return (num_components, component_size) | |
| 2696 | |
| 2697 def _DeleteKeyFn(cls, key): | |
| 2698 """Wrapper function to be used with command.Apply().""" | |
| 2699 return key.delete_key() | |
| 2700 | |
| 2701 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock): | |
| 2702 """Parse the tracker file (if any) from the last parallel composite upload | |
| 2703 attempt. The tracker file is of the format described in | |
| 2704 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be | |
| 2705 read, then the upload will start from the beginning. | |
| 2706 | |
| 2707 Args: | |
| 2708 tracker_file: The name of the file to parse. | |
| 2709 tracker_file_lock: Lock protecting access to the tracker file. | |
| 2710 | |
| 2711 Returns: | |
| 2712 random_prefix: A randomly-generated prefix to the name of the | |
| 2713 temporary components. | |
| 2714 existing_objects: A list of ObjectFromTracker objects representing | |
| 2715 the set of files that have already been uploaded. | |
| 2716 """ | |
| 2717 existing_objects = [] | |
| 2718 try: | |
| 2719 with tracker_file_lock: | |
| 2720 f = open(tracker_file, 'r') | |
| 2721 lines = f.readlines() | |
| 2722 lines = [line.strip() for line in lines] | |
| 2723 f.close() | |
| 2724 except IOError as e: | |
| 2725 # We can't read the tracker file, so generate a new random prefix. | |
| 2726 lines = [str(random.randint(1, (10 ** 10) - 1))] | |
| 2727 | |
| 2728 # Ignore non-existent file (happens first time an upload | |
| 2729 # is attempted on a file), but warn user for other errors. | |
| 2730 if e.errno != errno.ENOENT: | |
| 2731 # Will restart because we failed to read in the file. | |
| 2732 print('Couldn\'t read parallel upload tracker file (%s): %s. ' | |
| 2733 'Restarting upload from scratch.' % (tracker_file, e.strerror)) | |
| 2734 | |
| 2735 # The first line contains the randomly-generated prefix. | |
| 2736 random_prefix = lines[0] | |
| 2737 | |
| 2738 # The remaining lines were written in pairs to describe a single component | |
| 2739 # in the form: | |
| 2740 # object_name (without random prefix) | |
| 2741 # generation | |
| 2742 # Newlines are used as the delimiter because only newlines and carriage | |
| 2743 # returns are invalid characters in object names, and users can specify | |
| 2744 # a custom prefix in the config file. | |
| 2745 i = 1 | |
| 2746 while i < len(lines): | |
| 2747 (name, generation) = (lines[i], lines[i+1]) | |
| 2748 if generation == '': | |
| 2749 generation = None | |
| 2750 existing_objects.append(ObjectFromTracker(name, generation)) | |
| 2751 i += 2 | |
| 2752 return (random_prefix, existing_objects) | |
| 2753 | |
| 2754 def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component, | |
| 2755 tracker_file_lock): | |
| 2756 """Appends information about the uploaded component to the contents of an | |
| 2757 existing tracker file, following the format described in | |
| 2758 _CreateParallelUploadTrackerFile.""" | |
| 2759 lines = _GetParallelUploadTrackerFileLinesForComponents([component]) | |
| 2760 lines = [line + '\n' for line in lines] | |
| 2761 with tracker_file_lock: | |
| 2762 with open(tracker_file, 'a') as f: | |
| 2763 f.writelines(lines) | |
| 2764 | |
| 2765 def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components, | |
| 2766 tracker_file_lock): | |
| 2767 """Writes information about components that were successfully uploaded so | |
| 2768 that the upload can be resumed at a later date. The tracker file has | |
| 2769 the format: | |
| 2770 random_prefix | |
| 2771 temp_object_1_name | |
| 2772 temp_object_1_generation | |
| 2773 . | |
| 2774 . | |
| 2775 . | |
| 2776 temp_object_N_name | |
| 2777 temp_object_N_generation | |
| 2778 where N is the number of components that have been successfully uploaded. | |
| 2779 | |
| 2780 Args: | |
| 2781 tracker_file: The name of the parallel upload tracker file. | |
| 2782 random_prefix: The randomly-generated prefix that was used for | |
| 2783 for uploading any existing components. | |
| 2784 components: A list of ObjectFromTracker objects that were uploaded. | |
| 2785 """ | |
| 2786 lines = [random_prefix] | |
| 2787 lines += _GetParallelUploadTrackerFileLinesForComponents(components) | |
| 2788 lines = [line + '\n' for line in lines] | |
| 2789 with tracker_file_lock: | |
| 2790 open(tracker_file, 'w').close() # Clear the file. | |
| 2791 with open(tracker_file, 'w') as f: | |
| 2792 f.writelines(lines) | |
| 2793 | |
| 2794 def _GetParallelUploadTrackerFileLinesForComponents(components): | |
| 2795 """Return a list of the lines that should appear in the parallel composite | |
| 2796 upload tracker file representing the given components, using the format | |
| 2797 as described in _CreateParallelUploadTrackerFile.""" | |
| 2798 lines = [] | |
| 2799 for component in components: | |
| 2800 generation = None | |
| 2801 generation = component.generation | |
| 2802 if not generation: | |
| 2803 generation = '' | |
| 2804 lines += [component.object_name, generation] | |
| 2805 return lines | |
| 2806 | |
| 2807 def FilterExistingComponents(dst_args, existing_components, | |
| 2808 bucket_name, suri_builder): | |
| 2809 """Given the list of all target objects based on partitioning the file and | |
| 2810 the list of objects that have already been uploaded successfully, | |
| 2811 this function determines which objects should be uploaded, which | |
| 2812 existing components are still valid, and which existing components should | |
| 2813 be deleted. | |
| 2814 | |
| 2815 Args: | |
| 2816 dst_args: The map of file_name -> PerformResumableUploadIfAppliesArgs | |
| 2817 calculated by partitioning the file. | |
| 2818 existing_components: A list of ObjectFromTracker objects that have been | |
| 2819 uploaded in the past. | |
| 2820 bucket_name: The name of the bucket in which the components exist. | |
| 2821 | |
| 2822 Returns: | |
| 2823 components_to_upload: List of components that need to be uploaded. | |
| 2824 uploaded_components: List of components that have already been | |
| 2825 uploaded and are still valid. | |
| 2826 existing_objects_to_delete: List of components that have already | |
| 2827 been uploaded, but are no longer valid | |
| 2828 and are in a versioned bucket, and | |
| 2829 therefore should be deleted. | |
| 2830 """ | |
| 2831 components_to_upload = [] | |
| 2832 existing_component_names = [component.object_name | |
| 2833 for component in existing_components] | |
| 2834 for component_name in dst_args: | |
| 2835 if not (component_name in existing_component_names): | |
| 2836 components_to_upload.append(dst_args[component_name]) | |
| 2837 | |
| 2838 objects_already_chosen = [] | |
| 2839 | |
| 2840 # Don't reuse any temporary components whose MD5 doesn't match the current | |
| 2841 # MD5 of the corresponding part of the file. If the bucket is versioned, | |
| 2842 # also make sure that we delete the existing temporary version. | |
| 2843 existing_objects_to_delete = [] | |
| 2844 uploaded_components = [] | |
| 2845 for tracker_object in existing_components: | |
| 2846 if ((not tracker_object.object_name in dst_args.keys()) | |
| 2847 or tracker_object.object_name in objects_already_chosen): | |
| 2848 # This could happen if the component size has changed. This also serves | |
| 2849 # to handle object names that get duplicated in the tracker file due | |
| 2850 # to people doing things they shouldn't (e.g., overwriting an existing | |
| 2851 # temporary component in a versioned bucket). | |
| 2852 | |
| 2853 uri = MakeGsUri(bucket_name, tracker_object.object_name, suri_builder) | |
| 2854 uri.generation = tracker_object.generation | |
| 2855 existing_objects_to_delete.append(uri) | |
| 2856 continue | |
| 2857 | |
| 2858 dst_arg = dst_args[tracker_object.object_name] | |
| 2859 file_part = FilePart(dst_arg.filename, dst_arg.file_start, | |
| 2860 dst_arg.file_length) | |
| 2861 # TODO: calculate MD5's in parallel when possible. | |
| 2862 content_md5 = _CalculateMd5FromContents(file_part) | |
| 2863 | |
| 2864 try: | |
| 2865 # Get the MD5 of the currently-existing component. | |
| 2866 blr = BucketListingRef(dst_arg.dst_uri) | |
| 2867 etag = blr.GetKey().etag | |
| 2868 except Exception as e: | |
| 2869 # We don't actually care what went wrong - we couldn't retrieve the | |
| 2870 # object to check the MD5, so just upload it again. | |
| 2871 etag = None | |
| 2872 if etag != (('"%s"') % content_md5): | |
| 2873 components_to_upload.append(dst_arg) | |
| 2874 objects_already_chosen.append(tracker_object.object_name) | |
| 2875 if tracker_object.generation: | |
| 2876 # If the old object doesn't have a generation (i.e., it isn't in a | |
| 2877 # versioned bucket), then we will just overwrite it anyway. | |
| 2878 invalid_component_with_generation = copy.deepcopy(dst_arg.dst_uri) | |
| 2879 invalid_component_with_generation.generation = tracker_object.generation | |
| 2880 existing_objects_to_delete.append(invalid_component_with_generation) | |
| 2881 else: | |
| 2882 uri = copy.deepcopy(dst_arg.dst_uri) | |
| 2883 uri.generation = tracker_object.generation | |
| 2884 uploaded_components.append(uri) | |
| 2885 objects_already_chosen.append(tracker_object.object_name) | |
| 2886 | |
| 2887 if uploaded_components: | |
| 2888 logging.info(("Found %d existing temporary components to reuse.") | |
| 2889 % len(uploaded_components)) | |
| 2890 | |
| 2891 return (components_to_upload, uploaded_components, | |
| 2892 existing_objects_to_delete) | |
| 2893 | |
| 2894 def MakeGsUri(bucket, filename, suri_builder): | |
| 2895 """Returns a StorageUri for an object in GCS.""" | |
| 2896 return suri_builder.StorageUri(bucket + '/' + filename) | |
| 2897 | |
| 2898 def _CalculateMd5FromContents(file): | |
| 2899 """Calculates the MD5 hash of the contents of a file. | |
| 2900 | |
| 2901 Args: | |
| 2902 file: An already-open file object. | |
| 2903 """ | |
| 2904 current_md5 = md5() | |
| 2905 file.seek(0) | |
| 2906 while True: | |
| 2907 data = file.read(8192) | |
| 2908 if not data: | |
| 2909 break | |
| 2910 current_md5.update(data) | |
| 2911 file.seek(0) | |
| 2912 return current_md5.hexdigest() | |
| OLD | NEW |