Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(149)

Side by Side Diff: gslib/commands/cp.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « gslib/commands/cors.py ('k') | gslib/commands/defacl.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # -*- coding: utf-8 -*-
1 # Copyright 2011 Google Inc. All Rights Reserved. 2 # Copyright 2011 Google Inc. All Rights Reserved.
2 # Copyright 2011, Nexenta Systems Inc. 3 # Copyright 2011, Nexenta Systems Inc.
3 # 4 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); 5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License. 6 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at 7 # You may obtain a copy of the License at
7 # 8 #
8 # http://www.apache.org/licenses/LICENSE-2.0 9 # http://www.apache.org/licenses/LICENSE-2.0
9 # 10 #
10 # Unless required by applicable law or agreed to in writing, software 11 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and 14 # See the License for the specific language governing permissions and
14 # limitations under the License. 15 # limitations under the License.
16 """Implementation of Unix-like cp command for cloud storage providers."""
15 17
16 # Get the system logging module, not our local logging module.
17 from __future__ import absolute_import 18 from __future__ import absolute_import
18 19
19 import binascii
20 import boto
21 import copy
22 import crcmod
23 import csv
24 import datetime
25 import errno
26 import gslib
27 import gzip
28 import hashlib
29 import logging
30 import mimetypes
31 import mmap
32 import multiprocessing
33 import os 20 import os
34 import platform
35 import random
36 import re
37 import stat
38 import subprocess
39 import sys
40 import tempfile
41 import textwrap
42 import threading
43 import time 21 import time
44 import traceback 22 import traceback
45 23
46 from gslib.util import AddAcceptEncoding 24 from gslib import copy_helper
47 25 from gslib.cat_helper import CatHelper
48 try: 26 from gslib.cloud_api import AccessDeniedException
49 from hashlib import md5 27 from gslib.cloud_api import NotFoundException
50 except ImportError:
51 from md5 import md5
52
53 from boto import config
54 from boto.exception import GSResponseError
55 from boto.exception import ResumableUploadException
56 from boto.gs.resumable_upload_handler import ResumableUploadHandler
57 from boto.s3.keyfile import KeyFile
58 from boto.s3.resumable_download_handler import ResumableDownloadHandler
59 from boto.storage_uri import BucketStorageUri
60 from boto.storage_uri import StorageUri
61 from collections import namedtuple
62 from gslib.bucket_listing_ref import BucketListingRef
63 from gslib.command import COMMAND_NAME
64 from gslib.command import COMMAND_NAME_ALIASES
65 from gslib.command import Command 28 from gslib.command import Command
66 from gslib.command import FILE_URIS_OK
67 from gslib.command import MAX_ARGS
68 from gslib.command import MIN_ARGS
69 from gslib.command import PROVIDER_URIS_OK
70 from gslib.command import SUPPORTED_SUB_ARGS
71 from gslib.command import URIS_START_ARG
72 from gslib.commands.compose import MAX_COMPONENT_COUNT 29 from gslib.commands.compose import MAX_COMPONENT_COUNT
73 from gslib.commands.compose import MAX_COMPOSE_ARITY 30 from gslib.copy_helper import CreateCopyHelperOpts
74 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SI ZE 31 from gslib.copy_helper import ItemExistsError
75 from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD 32 from gslib.copy_helper import Manifest
33 from gslib.copy_helper import PARALLEL_UPLOAD_TEMP_NAMESPACE
34 from gslib.cs_api_map import ApiSelector
76 from gslib.exception import CommandException 35 from gslib.exception import CommandException
77 from gslib.file_part import FilePart
78 from gslib.help_provider import HELP_NAME
79 from gslib.help_provider import HELP_NAME_ALIASES
80 from gslib.help_provider import HELP_ONE_LINE_SUMMARY
81 from gslib.help_provider import HELP_TEXT
82 from gslib.help_provider import HelpType
83 from gslib.help_provider import HELP_TYPE
84 from gslib.name_expansion import NameExpansionIterator 36 from gslib.name_expansion import NameExpansionIterator
85 from gslib.util import BOTO_IS_SECURE 37 from gslib.storage_url import ContainsWildcard
86 from gslib.util import CreateLock 38 from gslib.util import CreateLock
87 from gslib.util import CreateTrackerDirIfNeeded 39 from gslib.util import GetCloudApiInstance
88 from gslib.util import GetConfigFilePath 40 from gslib.util import IsCloudSubdirPlaceholder
89 from gslib.util import ParseErrorDetail
90 from gslib.util import HumanReadableToBytes
91 from gslib.util import IS_WINDOWS
92 from gslib.util import MakeHumanReadable 41 from gslib.util import MakeHumanReadable
93 from gslib.util import NO_MAX 42 from gslib.util import NO_MAX
94 from gslib.util import TWO_MB 43 from gslib.util import RemoveCRLFFromString
95 from gslib.util import UsingCrcmodExtension
96 from gslib.wildcard_iterator import ContainsWildcard
97 from gslib.name_expansion import NameExpansionResult
98
99
100 SLOW_CRC_WARNING = """
101 WARNING: Downloading this composite object requires integrity checking with
102 CRC32c, but your crcmod installation isn't using the module's C extension, so
103 the the hash computation will likely throttle download performance. For help
104 installing the extension, please see:
105 $ gsutil help crcmod
106 To disable slow integrity checking, see the "check_hashes" option in your boto
107 config file.
108 """
109
110 SLOW_CRC_EXCEPTION = CommandException(
111 """
112 Downloading this composite object requires integrity checking with CRC32c, but
113 your crcmod installation isn't using the module's C extension, so the the hash
114 computation will likely throttle download performance. For help installing the
115 extension, please see:
116 $ gsutil help crcmod
117 To download regardless of crcmod performance or to skip slow integrity checks,
118 see the "check_hashes" option in your boto config file.""")
119
120 NO_HASH_CHECK_WARNING = """
121 WARNING: This download will not be validated since your crcmod installation
122 doesn't use the module's C extension, so the hash computation would likely
123 throttle download performance. For help in installing the extension, please see:
124 $ gsutil help crcmod
125 To force integrity checking, see the "check_hashes" option in your boto config
126 file.
127 """
128
129 NO_SERVER_HASH_EXCEPTION = CommandException(
130 """
131 This object has no server-supplied hash for performing integrity
132 checks. To skip integrity checking for such objects, see the "check_hashes"
133 option in your boto config file.""")
134
135 NO_SERVER_HASH_WARNING = """
136 WARNING: This object has no server-supplied hash for performing integrity
137 checks. To force integrity checking, see the "check_hashes" option in your boto
138 config file.
139 """
140
141 PARALLEL_UPLOAD_TEMP_NAMESPACE = (
142 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/')
143
144 PARALLEL_UPLOAD_STATIC_SALT = u"""
145 PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS.
146 The theory is that no user will have prepended this to the front of
147 one of their object names and then done an MD5 hash of the name, and
148 then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object
149 name. Note that there will be no problems with object name length since we
150 hash the original name.
151 """
152
153 # In order to prevent people from uploading thousands of tiny files in parallel
154 # (which, apart from being useless, is likely to cause them to be throttled
155 # for the compose calls), don't allow files smaller than this to use parallel
156 # composite uploads.
157 MIN_PARALLEL_COMPOSITE_FILE_SIZE = 20971520 # 20 MB
158 44
159 SYNOPSIS_TEXT = """ 45 SYNOPSIS_TEXT = """
160 <B>SYNOPSIS</B> 46 <B>SYNOPSIS</B>
161 gsutil cp [OPTION]... src_uri dst_uri 47 gsutil cp [OPTION]... src_url dst_url
162 gsutil cp [OPTION]... src_uri... dst_uri 48 gsutil cp [OPTION]... src_url... dst_url
163 gsutil cp [OPTION]... -I dst_uri 49 gsutil cp [OPTION]... -I dst_url
164 """ 50 """
165 51
166 DESCRIPTION_TEXT = """ 52 DESCRIPTION_TEXT = """
167 <B>DESCRIPTION</B> 53 <B>DESCRIPTION</B>
168 The gsutil cp command allows you to copy data between your local file 54 The gsutil cp command allows you to copy data between your local file
169 system and the cloud, copy data within the cloud, and copy data between 55 system and the cloud, copy data within the cloud, and copy data between
170 cloud storage providers. For example, to copy all text files from the 56 cloud storage providers. For example, to copy all text files from the
171 local directory to a bucket you could do: 57 local directory to a bucket you could do:
172 58
173 gsutil cp *.txt gs://my_bucket 59 gsutil cp *.txt gs://my_bucket
174 60
175 Similarly, you can download text files from a bucket by doing: 61 Similarly, you can download text files from a bucket by doing:
176 62
177 gsutil cp gs://my_bucket/*.txt . 63 gsutil cp gs://my_bucket/*.txt .
178 64
179 If you want to copy an entire directory tree you need to use the -R option: 65 If you want to copy an entire directory tree you need to use the -R option:
180 66
181 gsutil cp -R dir gs://my_bucket 67 gsutil cp -R dir gs://my_bucket
182 68
183 If you have a large number of files to upload you might want to use the 69 If you have a large number of files to upload you might want to use the
184 gsutil -m option, to perform a parallel (multi-threaded/multi-processing) 70 gsutil -m option, to perform a parallel (multi-threaded/multi-processing)
185 copy: 71 copy:
186 72
187 gsutil -m cp -R dir gs://my_bucket 73 gsutil -m cp -R dir gs://my_bucket
188 74
189 You can pass a list of URIs to copy on STDIN instead of as command line 75 You can pass a list of URLs (one per line) to copy on STDIN instead of as
190 arguments by using the -I option. This allows you to use gsutil in a 76 command line arguments by using the -I option. This allows you to use gsutil
191 pipeline to copy files and objects as generated by a program, such as: 77 in a pipeline to upload or download files / objects as generated by a program,
78 such as:
192 79
193 some_program | gsutil -m cp -I gs://my_bucket 80 some_program | gsutil -m cp -I gs://my_bucket
194 81
195 The contents of STDIN can name files, cloud URIs, and wildcards of files 82 or:
196 and cloud URIs. 83
84 some_program | gsutil -m cp -I ./download_dir
85
86 The contents of STDIN can name files, cloud URLs, and wildcards of files
87 and cloud URLs.
197 """ 88 """
198 89
199 NAME_CONSTRUCTION_TEXT = """ 90 NAME_CONSTRUCTION_TEXT = """
200 <B>HOW NAMES ARE CONSTRUCTED</B> 91 <B>HOW NAMES ARE CONSTRUCTED</B>
201 The gsutil cp command strives to name objects in a way consistent with how 92 The gsutil cp command strives to name objects in a way consistent with how
202 Linux cp works, which causes names to be constructed in varying ways depending 93 Linux cp works, which causes names to be constructed in varying ways depending
203 on whether you're performing a recursive directory copy or copying 94 on whether you're performing a recursive directory copy or copying
204 individually named objects; and whether you're copying to an existing or 95 individually named objects; and whether you're copying to an existing or
205 non-existent directory. 96 non-existent directory.
206 97
(...skipping 23 matching lines...) Expand all
230 121
231 There's an additional wrinkle when working with subdirectories: the resulting 122 There's an additional wrinkle when working with subdirectories: the resulting
232 names depend on whether the destination subdirectory exists. For example, 123 names depend on whether the destination subdirectory exists. For example,
233 if gs://my_bucket/subdir exists as a subdirectory, the command: 124 if gs://my_bucket/subdir exists as a subdirectory, the command:
234 125
235 gsutil cp -R dir1/dir2 gs://my_bucket/subdir 126 gsutil cp -R dir1/dir2 gs://my_bucket/subdir
236 127
237 will create objects named like gs://my_bucket/subdir/dir2/a/b/c. In contrast, 128 will create objects named like gs://my_bucket/subdir/dir2/a/b/c. In contrast,
238 if gs://my_bucket/subdir does not exist, this same gsutil cp command will 129 if gs://my_bucket/subdir does not exist, this same gsutil cp command will
239 create objects named like gs://my_bucket/subdir/a/b/c. 130 create objects named like gs://my_bucket/subdir/a/b/c.
131
132 Note: If you use the
133 `Google Developers Console <https://console.developers.google.com>`_
134 to create folders, it does so by creating a "placeholder" object that ends
135 with a "/" character. gsutil skips these objects when downloading from the
136 cloud to the local file system, because attempting to create a file that
137 ends with a "/" is not allowed on Linux and MacOS. Because of this, it is
138 recommended that you not create objects that end with "/" (unless you don't
139 need to be able to download such objects using gsutil).
240 """ 140 """
241 141
242 SUBDIRECTORIES_TEXT = """ 142 SUBDIRECTORIES_TEXT = """
243 <B>COPYING TO/FROM SUBDIRECTORIES; DISTRIBUTING TRANSFERS ACROSS MACHINES</B> 143 <B>COPYING TO/FROM SUBDIRECTORIES; DISTRIBUTING TRANSFERS ACROSS MACHINES</B>
244 You can use gsutil to copy to and from subdirectories by using a command 144 You can use gsutil to copy to and from subdirectories by using a command
245 like: 145 like:
246 146
247 gsutil cp -R dir gs://my_bucket/data 147 gsutil cp -R dir gs://my_bucket/data
248 148
249 This will cause dir and all of its files and nested subdirectories to be 149 This will cause dir and all of its files and nested subdirectories to be
(...skipping 25 matching lines...) Expand all
275 gsutil -m cp -R gs://my_bucket/data/result_set_[7-9]* dir 175 gsutil -m cp -R gs://my_bucket/data/result_set_[7-9]* dir
276 176
277 Note that dir could be a local directory on each machine, or it could 177 Note that dir could be a local directory on each machine, or it could
278 be a directory mounted off of a shared file server; whether the latter 178 be a directory mounted off of a shared file server; whether the latter
279 performs acceptably may depend on a number of things, so we recommend 179 performs acceptably may depend on a number of things, so we recommend
280 you experiment and find out what works best for you. 180 you experiment and find out what works best for you.
281 """ 181 """
282 182
283 COPY_IN_CLOUD_TEXT = """ 183 COPY_IN_CLOUD_TEXT = """
284 <B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B> 184 <B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B>
285 If both the source and destination URI are cloud URIs from the same 185 If both the source and destination URL are cloud URLs from the same
286 provider, gsutil copies data "in the cloud" (i.e., without downloading 186 provider, gsutil copies data "in the cloud" (i.e., without downloading
287 to and uploading from the machine where you run gsutil). In addition to 187 to and uploading from the machine where you run gsutil). In addition to
288 the performance and cost advantages of doing this, copying in the cloud 188 the performance and cost advantages of doing this, copying in the cloud
289 preserves metadata (like Content-Type and Cache-Control). In contrast, 189 preserves metadata (like Content-Type and Cache-Control). In contrast,
290 when you download data from the cloud it ends up in a file, which has 190 when you download data from the cloud it ends up in a file, which has
291 no associated metadata. Thus, unless you have some way to hold on to 191 no associated metadata. Thus, unless you have some way to hold on to
292 or re-create that metadata, downloading to a file will not retain the 192 or re-create that metadata, downloading to a file will not retain the
293 metadata. 193 metadata.
294 194
295 Note that by default, the gsutil cp command does not copy the object 195 Note that by default, the gsutil cp command does not copy the object
296 ACL to the new object, and instead will use the default bucket ACL (see 196 ACL to the new object, and instead will use the default bucket ACL (see
297 "gsutil help defacl"). You can override this behavior with the -p 197 "gsutil help defacl"). You can override this behavior with the -p
298 option (see OPTIONS below). 198 option (see OPTIONS below).
199
200 One additional note about copying in the cloud: If the destination bucket has
201 versioning enabled, gsutil cp will copy all versions of the source object(s).
202 For example:
203
204 gsutil cp gs://bucket1/obj gs://bucket2
205
206 will cause all versions of gs://bucket1/obj to be copied to gs://bucket2.
207 """
208
209 FAILURE_HANDLING_TEXT = """
210 <B>CHECKSUM VALIDATION AND FAILURE HANDLING</B>
211 At the end of every upload or download, the gsutil cp command validates that
212 that the checksum of the source file/object matches the checksum of the
213 destination file/object. If the checksums do not match, gsutil will delete
214 the invalid copy and print a warning message. This very rarely happens, but
215 if it does, please contact gs-team@google.com.
216
217 The cp command will retry when failures occur, but if enough failures happen
218 during a particular copy or delete operation the command will skip that object
219 and move on. At the end of the copy run if any failures were not successfully
220 retried, the cp command will report the count of failures, and exit with
221 non-zero status.
222
223 Note that there are cases where retrying will never succeed, such as if you
224 don't have write permission to the destination bucket or if the destination
225 path for some objects is longer than the maximum allowed length.
226
227 For more details about gsutil's retry handling, please see
228 "gsutil help retries".
299 """ 229 """
300 230
301 RESUMABLE_TRANSFERS_TEXT = """ 231 RESUMABLE_TRANSFERS_TEXT = """
302 <B>RESUMABLE TRANSFERS</B> 232 <B>RESUMABLE TRANSFERS</B>
303 gsutil automatically uses the Google Cloud Storage resumable upload 233 gsutil automatically uses the Google Cloud Storage resumable upload feature
304 feature whenever you use the cp command to upload an object that is larger 234 whenever you use the cp command to upload an object that is larger than 2
305 than 2 MB. You do not need to specify any special command line options 235 MB. You do not need to specify any special command line options to make this
306 to make this happen. If your upload is interrupted you can restart the 236 happen. If your upload is interrupted you can restart the upload by running
307 upload by running the same cp command that you ran to start the upload. 237 the same cp command that you ran to start the upload. Until the upload
238 has completed successfully, it will not be visible at the destination object
239 and will not replace any existing object the upload is intended to overwrite.
240 (However, see the section on PARALLEL COMPOSITE UPLOADS, which may leave
241 temporary component objects in place during the upload process.)
308 242
309 Similarly, gsutil automatically performs resumable downloads (using HTTP 243 Similarly, gsutil automatically performs resumable downloads (using HTTP
310 standard Range GET operations) whenever you use the cp command to download an 244 standard Range GET operations) whenever you use the cp command to download an
311 object larger than 2 MB. 245 object larger than 2 MB. In this case the partially downloaded file will be
246 visible as soon as it starts being written. Thus, before you attempt to use
247 any files downloaded by gsutil you should make sure the download completed
248 successfully, by checking the exit status from the gsutil command. This can
249 be done in a bash script, for example, by doing:
250
251 gsutil cp gs://your-bucket/your-object ./local-file
252 if [ "$status" -ne "0" ] ; then
253 << Code that handles failures >>
254 fi
312 255
313 Resumable uploads and downloads store some state information in a file 256 Resumable uploads and downloads store some state information in a file
314 in ~/.gsutil named by the destination object or file. If you attempt to 257 in ~/.gsutil named by the destination object or file. If you attempt to
315 resume a transfer from a machine with a different directory, the transfer 258 resume a transfer from a machine with a different directory, the transfer
316 will start over from scratch. 259 will start over from scratch.
317 260
318 See also "gsutil help prod" for details on using resumable transfers 261 See also "gsutil help prod" for details on using resumable transfers
319 in production. 262 in production.
320 """ 263 """
321 264
322 STREAMING_TRANSFERS_TEXT = """ 265 STREAMING_TRANSFERS_TEXT = """
323 <B>STREAMING TRANSFERS</B> 266 <B>STREAMING TRANSFERS</B>
324 Use '-' in place of src_uri or dst_uri to perform a streaming 267 Use '-' in place of src_url or dst_url to perform a streaming
325 transfer. For example: 268 transfer. For example:
326 269
327 long_running_computation | gsutil cp - gs://my_bucket/obj 270 long_running_computation | gsutil cp - gs://my_bucket/obj
328 271
329 Streaming transfers do not support resumable uploads/downloads. 272 Streaming transfers do not support resumable uploads/downloads.
330 (The Google resumable transfer protocol has a way to support streaming 273 (The Google resumable transfer protocol has a way to support streaming
331 transfers, but gsutil doesn't currently implement support for this.) 274 transfers, but gsutil doesn't currently implement support for this.)
332 """ 275 """
333 276
334 PARALLEL_COMPOSITE_UPLOADS_TEXT = """ 277 PARALLEL_COMPOSITE_UPLOADS_TEXT = """
335 <B>PARALLEL COMPOSITE UPLOADS</B> 278 <B>PARALLEL COMPOSITE UPLOADS</B>
336 gsutil automatically uses `object composition <https://developers.google.com/s torage/docs/composite-objects>`_ 279 gsutil can automatically use
337 to perform uploads in parallel for large, local files being uploaded to 280 `object composition <https://developers.google.com/storage/docs/composite-obje cts>`_
338 Google Cloud Storage. This means that, by default, a large file will be split 281 to perform uploads in parallel for large, local files being uploaded to Google
339 into component pieces that will be uploaded in parallel. Those components will 282 Cloud Storage. This means that, if enabled (see next paragraph), a large file
340 then be composed in the cloud, and the temporary components in the cloud will 283 will be split into component pieces that will be uploaded in parallel. Those
341 be deleted after successful composition. No additional local disk space is 284 components will then be composed in the cloud, and the temporary components in
342 required for this operation. 285 the cloud will be deleted after successful composition. No additional local
286 disk space is required for this operation.
343 287
344 Any file whose size exceeds the "parallel_composite_upload_threshold" config 288 If the "parallel_composite_upload_threshold" config value is not 0 (which
345 variable will trigger this feature by default. The ideal size of a 289 disbles the feature), any file whose size exceeds the specified size will
346 component can also be set with the "parallel_composite_upload_component_size" 290 trigger a parallel composite upload. Note that at present parallel composite
347 config variable. See the .boto config file for details about how these values 291 uploads are disabled by default, because using composite objects requires a
348 are used. 292 compiled crcmod (see "gsutil help crcmod"), and for operating systems that
293 don't already have this package installed this makes gsutil harder to use.
294 Google is actively working with a number of the Linux distributions to get
295 crcmod included with the stock distribution. Once that is done we will
296 re-enable parallel composite uploads by default in gsutil.
297
298 The ideal size of a component can also be set with the
299 "parallel_composite_upload_component_size" config variable. See the comments
300 in the .boto config file for details about how these values are used.
349 301
350 If the transfer fails prior to composition, running the command again will 302 If the transfer fails prior to composition, running the command again will
351 take advantage of resumable uploads for those components that failed, and 303 take advantage of resumable uploads for those components that failed, and
352 the component objects will be deleted after the first successful attempt. 304 the component objects will be deleted after the first successful attempt.
353 Any temporary objects that were uploaded successfully before gsutil failed 305 Any temporary objects that were uploaded successfully before gsutil failed
354 will still exist until the upload is completed successfully. The temporary 306 will still exist until the upload is completed successfully. The temporary
355 objects will be named in the following fashion: 307 objects will be named in the following fashion:
356 <random ID>%s<hash> 308 <random ID>%s<hash>
357 where <random ID> is some numerical value, and <hash> is an MD5 hash (not 309 where <random ID> is some numerical value, and <hash> is an MD5 hash (not
358 related to the hash of the contents of the file or object). 310 related to the hash of the contents of the file or object).
359 311
312 To avoid leaving temporary objects around, you should make sure to check the
313 exit status from the gsutil command. This can be done in a bash script, for
314 example, by doing:
315
316 gsutil cp ./local-file gs://your-bucket/your-object
317 if [ "$status" -ne "0" ] ; then
318 << Code that handles failures >>
319 fi
320
321 Or, for copying a directory, use this instead:
322
323 gsutil cp -c -L cp.log -R ./dir gs://bucket
324 if [ "$status" -ne "0" ] ; then
325 << Code that handles failures >>
326 fi
327
360 One important caveat is that files uploaded in this fashion are still subject 328 One important caveat is that files uploaded in this fashion are still subject
361 to the maximum number of components limit. For example, if you upload a large 329 to the maximum number of components limit. For example, if you upload a large
362 file that gets split into %d components, and try to compose it with another 330 file that gets split into %d components, and try to compose it with another
363 object with %d components, the operation will fail because it exceeds the %d 331 object with %d components, the operation will fail because it exceeds the %d
364 component limit. If you wish to compose an object later and the component 332 component limit. If you wish to compose an object later and the component
365 limit is a concern, it is recommended that you disable parallel composite 333 limit is a concern, it is recommended that you disable parallel composite
366 uploads for that transfer. 334 uploads for that transfer.
367 335
368 Also note that an object uploaded using this feature will have a CRC32C hash, 336 Also note that an object uploaded using this feature will have a CRC32C hash,
369 but it will not have an MD5 hash. For details see 'gsutil help crc32c'. 337 but it will not have an MD5 hash. For details see 'gsutil help crc32c'.
370 338
371 Note that this feature can be completely disabled by setting the 339 Note that this feature can be completely disabled by setting the
372 "parallel_composite_upload_threshold" variable in the .boto config file to 0. 340 "parallel_composite_upload_threshold" variable in the .boto config file to 0.
373 """ % (PARALLEL_UPLOAD_TEMP_NAMESPACE, 10, MAX_COMPONENT_COUNT - 9, 341 """ % (PARALLEL_UPLOAD_TEMP_NAMESPACE, 10, MAX_COMPONENT_COUNT - 9,
374 MAX_COMPONENT_COUNT) 342 MAX_COMPONENT_COUNT)
375 343
344
376 CHANGING_TEMP_DIRECTORIES_TEXT = """ 345 CHANGING_TEMP_DIRECTORIES_TEXT = """
377 <B>CHANGING TEMP DIRECTORIES</B> 346 <B>CHANGING TEMP DIRECTORIES</B>
378 gsutil writes data to a temporary directory in several cases: 347 gsutil writes data to a temporary directory in several cases:
379 348
380 - when compressing data to be uploaded (see the -z option) 349 - when compressing data to be uploaded (see the -z option)
381 - when decompressing data being downloaded (when the data has 350 - when decompressing data being downloaded (when the data has
382 Content-Encoding:gzip, e.g., as happens when uploaded using gsutil cp -z) 351 Content-Encoding:gzip, e.g., as happens when uploaded using gsutil cp -z)
383 - when running integration tests (using the gsutil test command) 352 - when running integration tests (using the gsutil test command)
384 353
385 In these cases it's possible the temp file location on your system that 354 In these cases it's possible the temp file location on your system that
(...skipping 12 matching lines...) Expand all
398 export TMPDIR=/some/directory 367 export TMPDIR=/some/directory
399 368
400 On Windows 7 you can change the TMPDIR environment variable from Start -> 369 On Windows 7 you can change the TMPDIR environment variable from Start ->
401 Computer -> System -> Advanced System Settings -> Environment Variables. 370 Computer -> System -> Advanced System Settings -> Environment Variables.
402 You need to reboot after making this change for it to take effect. (Rebooting 371 You need to reboot after making this change for it to take effect. (Rebooting
403 is not necessary after running the export command on Linux and MacOS.) 372 is not necessary after running the export command on Linux and MacOS.)
404 """ 373 """
405 374
406 OPTIONS_TEXT = """ 375 OPTIONS_TEXT = """
407 <B>OPTIONS</B> 376 <B>OPTIONS</B>
408 -a canned_acl Sets named canned_acl when uploaded objects created. See 377 -a canned_acl Sets named canned_acl when uploaded objects created. See
409 'gsutil help acls' for further details. 378 'gsutil help acls' for further details.
410 379
411 -c If an error occurrs, continue to attempt to copy the remaining 380 -c If an error occurrs, continue to attempt to copy the remaining
412 files. Note that this option is always true when running 381 files. If any copies were unsuccessful, gsutil's exit status
413 "gsutil -m cp". 382 will be non-zero even if this flag is set. This option is
383 implicitly set when running "gsutil -m cp...". Note: -c only
384 applies to the actual copying operation. If an error occurs
385 while iterating over the files in the local directory (e.g.,
386 invalid Unicode file name) gsutil will print an error message
387 and abort.
414 388
415 -D Copy in "daisy chain" mode, i.e., copying between two buckets by 389 -D Copy in "daisy chain" mode, i.e., copying between two buckets
416 hooking a download to an upload, via the machine where gsutil is 390 by hooking a download to an upload, via the machine where
417 run. By default, data are copied between two buckets 391 gsutil is run. By default, data are copied between two buckets
418 "in the cloud", i.e., without needing to copy via the machine 392 "in the cloud", i.e., without needing to copy via the machine
419 where gsutil runs. 393 where gsutil runs.
420 394
421 By default, a "copy in the cloud" when the source is a composite 395 By default, a "copy in the cloud" when the source is a
422 object will retain the composite nature of the object. However, 396 composite object will retain the composite nature of the
423 Daisy chain mode can be used to change a composite object into 397 object. However, Daisy chain mode can be used to change a
424 a non-composite object. For example: 398 composite object into a non-composite object. For example:
425 399
426 gsutil cp -D -p gs://bucket/obj gs://bucket/obj_tmp 400 gsutil cp -D -p gs://bucket/obj gs://bucket/obj_tmp
427 gsutil mv -p gs://bucket/obj_tmp gs://bucket/obj 401 gsutil mv -p gs://bucket/obj_tmp gs://bucket/obj
428 402
429 Note: Daisy chain mode is automatically used when copying 403 Note: Daisy chain mode is automatically used when copying
430 between providers (e.g., to copy data from Google Cloud Storage 404 between providers (e.g., to copy data from Google Cloud Storage
431 to another provider). 405 to another provider).
432 406
433 -e Exclude symlinks. When specified, symbolic links will not be 407 -e Exclude symlinks. When specified, symbolic links will not be
434 copied. 408 copied.
435 409
436 -L <file> Outputs a manifest log file with detailed information about each 410 -I Causes gsutil to read the list of files or objects to copy from
437 item that was copied. This manifest contains the following 411 stdin. This allows you to run a program that generates the list
438 information for each item: 412 of files to upload/download.
439 413
440 - Source path. 414 -L <file> Outputs a manifest log file with detailed information about
441 - Destination path. 415 each item that was copied. This manifest contains the following
442 - Source size. 416 information for each item:
443 - Bytes transferred.
444 - MD5 hash.
445 - UTC date and time transfer was started in ISO 8601 format.
446 - UTC date and time transfer was completed in ISO 8601 format.
447 - Upload id, if a resumable upload was performed.
448 - Final result of the attempted transfer, success or failure.
449 - Failure details, if any.
450 417
451 If the log file already exists, gsutil will use the file as an 418 - Source path.
452 input to the copy process, and will also append log items to the 419 - Destination path.
453 existing file. Files/objects that are marked in the existing log 420 - Source size.
454 file as having been successfully copied (or skipped) will be 421 - Bytes transferred.
455 ignored. Files/objects without entries will be copied and ones 422 - MD5 hash.
456 previously marked as unsuccessful will be retried. This can be 423 - UTC date and time transfer was started in ISO 8601 format.
457 used in conjunction with the -c option to build a script that 424 - UTC date and time transfer was completed in ISO 8601 format.
458 copies a large number of objects reliably, using a bash script 425 - Upload id, if a resumable upload was performed.
459 like the following: 426 - Final result of the attempted transfer, success or failure.
427 - Failure details, if any.
460 428
461 status=1 429 If the log file already exists, gsutil will use the file as an
462 while [ $status -ne 0 ] ; do 430 input to the copy process, and will also append log items to
463 gsutil cp -c -L cp.log -R ./dir gs://bucket 431 the existing file. Files/objects that are marked in the
464 status=$? 432 existing log file as having been successfully copied (or
465 done 433 skipped) will be ignored. Files/objects without entries will be
434 copied and ones previously marked as unsuccessful will be
435 retried. This can be used in conjunction with the -c option to
436 build a script that copies a large number of objects reliably,
437 using a bash script like the following:
466 438
467 The -c option will cause copying to continue after failures 439 until gsutil cp -c -L cp.log -R ./dir gs://bucket; do
468 occur, and the -L option will allow gsutil to pick up where it 440 sleep 1
469 left off without duplicating work. The loop will continue 441 done
470 running as long as gsutil exits with a non-zero status (such a
471 status indicates there was at least one failure during the
472 gsutil run).
473 442
474 -n No-clobber. When specified, existing files or objects at the 443 The -c option will cause copying to continue after failures
475 destination will not be overwritten. Any items that are skipped 444 occur, and the -L option will allow gsutil to pick up where it
476 by this option will be reported as being skipped. This option 445 left off without duplicating work. The loop will continue
477 will perform an additional HEAD request to check if an item 446 running as long as gsutil exits with a non-zero status (such a
478 exists before attempting to upload the data. This will save 447 status indicates there was at least one failure during the
479 retransmitting data, but the additional HTTP requests may make 448 gsutil run).
480 small object transfers slower and more expensive.
481 449
482 -p Causes source ACLs to be preserved when copying in the cloud. 450 Note: If you're trying to synchronize the contents of a
483 Note that this option has performance and cost implications, 451 directory and a bucket (or two buckets), see
484 because it is essentially performing three requests ('acl get', 452 'gsutil help rsync'.
485 cp, 'acl set'). (The performance issue can be mitigated to some
486 degree by using gsutil -m cp to cause parallel copying.)
487 453
488 You can avoid the additional performance and cost of using cp -p 454 -n No-clobber. When specified, existing files or objects at the
489 if you want all objects in the destination bucket to end up with 455 destination will not be overwritten. Any items that are skipped
490 the same ACL by setting a default ACL on that bucket instead of 456 by this option will be reported as being skipped. This option
491 using cp -p. See "help gsutil defacl". 457 will perform an additional GET request to check if an item
458 exists before attempting to upload the data. This will save
459 retransmitting data, but the additional HTTP requests may make
460 small object transfers slower and more expensive.
492 461
493 Note that it's not valid to specify both the -a and -p options 462 -p Causes ACLs to be preserved when copying in the cloud. Note
494 together. 463 that this option has performance and cost implications when
464 using the XML API, as it requires separate HTTP calls for
465 interacting with ACLs. The performance issue can be mitigated
466 to some degree by using gsutil -m cp to cause parallel copying.
467 Also, this option only works if you have OWNER access to all of
468 the objects that are copied.
495 469
496 -q Deprecated. Please use gsutil -q cp ... instead. 470 You can avoid the additional performance and cost of using
471 cp -p if you want all objects in the destination bucket to end
472 up with the same ACL by setting a default object ACL on that
473 bucket instead of using cp -p. See "help gsutil defacl".
497 474
498 -R, -r Causes directories, buckets, and bucket subdirectories to be 475 Note that it's not valid to specify both the -a and -p options
499 copied recursively. If you neglect to use this option for 476 together.
500 an upload, gsutil will copy any files it finds and skip any
501 directories. Similarly, neglecting to specify -R for a download
502 will cause gsutil to copy any objects at the current bucket
503 directory level, and skip any subdirectories.
504 477
505 -v Requests that the version-specific URI for each uploaded object 478 -R, -r Causes directories, buckets, and bucket subdirectories to be
506 be printed. Given this URI you can make future upload requests 479 copied recursively. If you neglect to use this option for
507 that are safe in the face of concurrent updates, because Google 480 an upload, gsutil will copy any files it finds and skip any
508 Cloud Storage will refuse to perform the update if the current 481 directories. Similarly, neglecting to specify -R for a download
509 object version doesn't match the version-specific URI. See 482 will cause gsutil to copy any objects at the current bucket
510 'gsutil help versions' for more details. 483 directory level, and skip any subdirectories.
511 484
512 -z <ext,...> Applies gzip content-encoding to file uploads with the given 485 -v Requests that the version-specific URL for each uploaded object
513 extensions. This is useful when uploading files with 486 be printed. Given this URL you can make future upload requests
514 compressible content (such as .js, .css, or .html files) because 487 that are safe in the face of concurrent updates, because Google
515 it saves network bandwidth and space in Google Cloud Storage, 488 Cloud Storage will refuse to perform the update if the current
516 which in turn reduces storage costs. 489 object version doesn't match the version-specific URL. See
490 'gsutil help versions' for more details.
517 491
518 When you specify the -z option, the data from your files is 492 -z <ext,...> Applies gzip content-encoding to file uploads with the given
519 compressed before it is uploaded, but your actual files are left 493 extensions. This is useful when uploading files with
520 uncompressed on the local disk. The uploaded objects retain the 494 compressible content (such as .js, .css, or .html files)
521 Content-Type and name of the original files but are given a 495 because it saves network bandwidth and space in Google Cloud
522 Content-Encoding header with the value "gzip" to indicate that 496 Storage, which in turn reduces storage costs.
523 the object data stored are compressed on the Google Cloud
524 Storage servers.
525 497
526 For example, the following command: 498 When you specify the -z option, the data from your files is
499 compressed before it is uploaded, but your actual files are
500 left uncompressed on the local disk. The uploaded objects
501 retain the Content-Type and name of the original files but are
502 given a Content-Encoding header with the value "gzip" to
503 indicate that the object data stored are compressed on the
504 Google Cloud Storage servers.
527 505
528 gsutil cp -z html -a public-read cattypes.html gs://mycats 506 For example, the following command:
529 507
530 will do all of the following: 508 gsutil cp -z html -a public-read cattypes.html gs://mycats
531 509
532 - Upload as the object gs://mycats/cattypes.html (cp command) 510 will do all of the following:
533 - Set the Content-Type to text/html (based on file extension) 511
534 - Compress the data in the file cattypes.html (-z option) 512 - Upload as the object gs://mycats/cattypes.html (cp command)
535 - Set the Content-Encoding to gzip (-z option) 513 - Set the Content-Type to text/html (based on file extension)
536 - Set the ACL to public-read (-a option) 514 - Compress the data in the file cattypes.html (-z option)
537 - If a user tries to view cattypes.html in a browser, the 515 - Set the Content-Encoding to gzip (-z option)
538 browser will know to uncompress the data based on the 516 - Set the ACL to public-read (-a option)
539 Content-Encoding header, and to render it as HTML based on 517 - If a user tries to view cattypes.html in a browser, the
540 the Content-Type header. 518 browser will know to uncompress the data based on the
519 Content-Encoding header, and to render it as HTML based on
520 the Content-Type header.
521
522 Note that if you download an object with Content-Encoding:gzip
523 gsutil will decompress the content before writing the local
524 file.
541 """ 525 """
542 526
543 _detailed_help_text = '\n\n'.join([SYNOPSIS_TEXT, 527 _DETAILED_HELP_TEXT = '\n\n'.join([SYNOPSIS_TEXT,
544 DESCRIPTION_TEXT, 528 DESCRIPTION_TEXT,
545 NAME_CONSTRUCTION_TEXT, 529 NAME_CONSTRUCTION_TEXT,
546 SUBDIRECTORIES_TEXT, 530 SUBDIRECTORIES_TEXT,
547 COPY_IN_CLOUD_TEXT, 531 COPY_IN_CLOUD_TEXT,
532 FAILURE_HANDLING_TEXT,
548 RESUMABLE_TRANSFERS_TEXT, 533 RESUMABLE_TRANSFERS_TEXT,
549 STREAMING_TRANSFERS_TEXT, 534 STREAMING_TRANSFERS_TEXT,
550 PARALLEL_COMPOSITE_UPLOADS_TEXT, 535 PARALLEL_COMPOSITE_UPLOADS_TEXT,
551 CHANGING_TEMP_DIRECTORIES_TEXT, 536 CHANGING_TEMP_DIRECTORIES_TEXT,
552 OPTIONS_TEXT]) 537 OPTIONS_TEXT])
553 538
554 # This tuple is used only to encapsulate the arguments needed for
555 # _PerformResumableUploadIfApplies, so that the arguments fit the model of
556 # command.Apply().
557 PerformResumableUploadIfAppliesArgs = namedtuple(
558 'PerformResumableUploadIfAppliesArgs',
559 'filename file_start file_length src_uri dst_uri canned_acl headers '
560 'tracker_file tracker_file_lock')
561 539
562 ObjectFromTracker = namedtuple('ObjectFromTracker', 540 CP_SUB_ARGS = 'a:cDeIL:MNnprRtvz:'
563 'object_name generation')
564
565 CP_SUB_ARGS = 'a:cDeIL:MNnpqrRtvz:'
566
567 # The maximum length of a file name can vary wildly between different
568 # operating systems, so we always ensure that tracker files are less
569 # than 100 characters in order to avoid any such issues.
570 MAX_TRACKER_FILE_NAME_LENGTH = 100
571 541
572 542
573 class TrackerFileType(object): 543 def _CopyFuncWrapper(cls, args, thread_state=None):
574 UPLOAD = 1 544 cls.CopyFunc(args, thread_state=thread_state)
575 DOWNLOAD = 2
576 PARALLEL_UPLOAD = 3
577 545
578 def _CopyFuncWrapper(cls, args):
579 cls._CopyFunc(args)
580
581 def _PerformResumableUploadIfAppliesWrapper(cls, args):
582 """A wrapper for cp._PerformResumableUploadIfApplies, which takes in a
583 PerformResumableUploadIfAppliesArgs, extracts the arguments to form the
584 arguments for the wrapped function, and then calls the wrapped function.
585 This was designed specifically for use with command.Apply().
586 """
587 fp = FilePart(args.filename, args.file_start, args.file_length)
588 with fp:
589 already_split = True
590 ret = cls._PerformResumableUploadIfApplies(
591 fp, args.src_uri, args.dst_uri, args.canned_acl, args.headers,
592 fp.length, already_split)
593
594 # Update the tracker file after each call in order to be as robust as possible
595 # against interrupts, failures, etc.
596 component = ret[2]
597 _AppendComponentTrackerToParallelUploadTrackerFile(args.tracker_file,
598 component,
599 args.tracker_file_lock)
600 return ret
601 546
602 def _CopyExceptionHandler(cls, e): 547 def _CopyExceptionHandler(cls, e):
603 """Simple exception handler to allow post-completion status.""" 548 """Simple exception handler to allow post-completion status."""
604 cls.logger.error(str(e)) 549 cls.logger.error(str(e))
605 cls.copy_failure_count += 1 550 cls.op_failure_count += 1
606 cls.logger.debug(('\n\nEncountered exception while copying:\n%s\n' % 551 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n',
607 traceback.format_exc())) 552 traceback.format_exc())
553
608 554
609 def _RmExceptionHandler(cls, e): 555 def _RmExceptionHandler(cls, e):
610 """Simple exception handler to allow post-completion status.""" 556 """Simple exception handler to allow post-completion status."""
611 cls.logger.error(str(e)) 557 cls.logger.error(str(e))
612 558
559
613 class CpCommand(Command): 560 class CpCommand(Command):
614 """ 561 """Implementation of gsutil cp command.
615 Implementation of gsutil cp command.
616 562
617 Note that CpCommand is run for both gsutil cp and gsutil mv. The latter 563 Note that CpCommand is run for both gsutil cp and gsutil mv. The latter
618 happens by MvCommand calling CpCommand and passing the hidden (undocumented) 564 happens by MvCommand calling CpCommand and passing the hidden (undocumented)
619 -M option. This allows the copy and remove needed for each mv to run 565 -M option. This allows the copy and remove needed for each mv to run
620 together (rather than first running all the cp's and then all the rm's, as 566 together (rather than first running all the cp's and then all the rm's, as
621 we originally had implemented), which in turn avoids the following problem 567 we originally had implemented), which in turn avoids the following problem
622 with removing the wrong objects: starting with a bucket containing only 568 with removing the wrong objects: starting with a bucket containing only
623 the object gs://bucket/obj, say the user does: 569 the object gs://bucket/obj, say the user does:
624 gsutil mv gs://bucket/* gs://bucket/d.txt 570 gsutil mv gs://bucket/* gs://bucket/d.txt
625 If we ran all the cp's and then all the rm's and we didn't expand the wildcard 571 If we ran all the cp's and then all the rm's and we didn't expand the wildcard
626 first, the cp command would first copy gs://bucket/obj to gs://bucket/d.txt, 572 first, the cp command would first copy gs://bucket/obj to gs://bucket/d.txt,
627 and the rm command would then remove that object. In the implementation 573 and the rm command would then remove that object. In the implementation
628 prior to gsutil release 3.12 we avoided this by building a list of objects 574 prior to gsutil release 3.12 we avoided this by building a list of objects
629 to process and then running the copies and then the removes; but building 575 to process and then running the copies and then the removes; but building
630 the list up front limits scalability (compared with the current approach 576 the list up front limits scalability (compared with the current approach
631 of processing the bucket listing iterator on the fly). 577 of processing the bucket listing iterator on the fly).
632 """ 578 """
633 579
634 # TODO: Refactor this file to be less cumbersome. In particular, some of the 580 # Command specification. See base class for documentation.
635 # different paths (e.g., uploading a file to an object vs. downloading an 581 command_spec = Command.CreateCommandSpec(
636 # object to a file) could be split into separate files. 582 'cp',
637 583 command_name_aliases=['copy'],
638 # Set default Content-Type type. 584 min_args=1,
639 DEFAULT_CONTENT_TYPE = 'application/octet-stream' 585 max_args=NO_MAX,
640 USE_MAGICFILE = boto.config.getbool('GSUtil', 'use_magicfile', False) 586 # -t is deprecated but leave intact for now to avoid breakage.
641 # Chunk size to use while unzipping gzip files. 587 supported_sub_args=CP_SUB_ARGS,
642 GUNZIP_CHUNK_SIZE = 8192 588 file_url_ok=True,
643 589 provider_url_ok=False,
644 # Command specification (processed by parent class). 590 urls_start_arg=0,
645 command_spec = { 591 gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
646 # Name of command. 592 gs_default_api=ApiSelector.JSON,
647 COMMAND_NAME : 'cp', 593 supported_private_args=['haltatbyte='],
648 # List of command name aliases. 594 )
649 COMMAND_NAME_ALIASES : ['copy'], 595 # Help specification. See help_provider.py for documentation.
650 # Min number of args required by this command. 596 help_spec = Command.HelpSpec(
651 MIN_ARGS : 1, 597 help_name='cp',
652 # Max number of args required by this command, or NO_MAX. 598 help_name_aliases=['copy'],
653 MAX_ARGS : NO_MAX, 599 help_type='command_help',
654 # Getopt-style string specifying acceptable sub args. 600 help_one_line_summary='Copy files and objects',
655 # -t is deprecated but leave intact for now to avoid breakage. 601 help_text=_DETAILED_HELP_TEXT,
656 SUPPORTED_SUB_ARGS : CP_SUB_ARGS, 602 subcommand_help_text={},
657 # True if file URIs acceptable for this command. 603 )
658 FILE_URIS_OK : True, 604
659 # True if provider-only URIs acceptable for this command. 605 # pylint: disable=too-many-statements
660 PROVIDER_URIS_OK : False, 606 def CopyFunc(self, name_expansion_result, thread_state=None):
661 # Index in args of first URI arg.
662 URIS_START_ARG : 0,
663 }
664 help_spec = {
665 # Name of command or auxiliary help info for which this help applies.
666 HELP_NAME : 'cp',
667 # List of help name aliases.
668 HELP_NAME_ALIASES : ['copy'],
669 # Type of help:
670 HELP_TYPE : HelpType.COMMAND_HELP,
671 # One line summary of this help.
672 HELP_ONE_LINE_SUMMARY : 'Copy files and objects',
673 # The full help text.
674 HELP_TEXT : _detailed_help_text,
675 }
676
677 def _GetMD5FromETag(self, key):
678 if not key.etag:
679 return None
680 possible_md5 = key.etag.strip('"\'').lower()
681 if re.match(r'^[0-9a-f]{32}$', possible_md5):
682 return binascii.a2b_hex(possible_md5)
683
684 def _CheckHashes(self, key, file_name, hash_algs_to_compute,
685 computed_hashes=None):
686 """Validates integrity by comparing cloud digest to local digest.
687
688 Args:
689 key: Instance of boto Key object.
690 file_name: Name of downloaded file on local disk.
691 hash_algs_to_compute: Dictionary mapping hash algorithm names to digester
692 objects.
693 computed_hashes: If specified, use this dictionary mapping hash algorithm
694 names to the calculated digest. If not specified, the
695 key argument will be checked for local_digests property.
696 If neither exist, the local file will be opened and
697 digests calculated on-demand.
698
699 Raises:
700 CommandException: if cloud digests don't match local digests.
701 """
702 cloud_hashes = {}
703 if hasattr(key, 'cloud_hashes'):
704 cloud_hashes = key.cloud_hashes
705 # Check for older-style MD5-based etag.
706 etag_md5 = self._GetMD5FromETag(key)
707 if etag_md5:
708 cloud_hashes.setdefault('md5', etag_md5)
709
710 local_hashes = {}
711 # If we've already computed a valid local hash, use that, else calculate an
712 # md5 or crc32c depending on what we have available to compare against.
713 if computed_hashes:
714 local_hashes = computed_hashes
715 elif hasattr(key, 'local_hashes') and key.local_hashes:
716 local_hashes = key.local_hashes
717 elif 'md5' in cloud_hashes and 'md5' in hash_algs_to_compute:
718 self.logger.info(
719 'Computing MD5 from scratch for resumed download')
720
721 # Open file in binary mode to avoid surprises in Windows.
722 with open(file_name, 'rb') as fp:
723 local_hashes['md5'] = binascii.a2b_hex(key.compute_md5(fp)[0])
724 elif 'crc32c' in cloud_hashes and 'crc32c' in hash_algs_to_compute:
725 self.logger.info(
726 'Computing CRC32C from scratch for resumed download')
727
728 # Open file in binary mode to avoid surprises in Windows.
729 with open(file_name, 'rb') as fp:
730 crc32c_alg = lambda: crcmod.predefined.Crc('crc-32c')
731 crc32c_hex = key.compute_hash(
732 fp, algorithm=crc32c_alg)[0]
733 local_hashes['crc32c'] = binascii.a2b_hex(crc32c_hex)
734
735 for alg in local_hashes:
736 if alg not in cloud_hashes:
737 continue
738 local_hexdigest = binascii.b2a_hex(local_hashes[alg])
739 cloud_hexdigest = binascii.b2a_hex(cloud_hashes[alg])
740 self.logger.debug('Comparing local vs cloud %s-checksum. (%s/%s)' % (
741 alg, local_hexdigest, cloud_hexdigest))
742 if local_hexdigest != cloud_hexdigest:
743 raise CommandException(
744 '%s signature computed for local file (%s) doesn\'t match '
745 'cloud-supplied digest (%s). Local file (%s) deleted.' % (
746 alg, local_hexdigest, cloud_hexdigest, file_name))
747
748 def _CheckForDirFileConflict(self, exp_src_uri, dst_uri):
749 """Checks whether copying exp_src_uri into dst_uri is not possible.
750
751 This happens if a directory exists in local file system where a file
752 needs to go or vice versa. In that case we print an error message and
753 exits. Example: if the file "./x" exists and you try to do:
754 gsutil cp gs://mybucket/x/y .
755 the request can't succeed because it requires a directory where
756 the file x exists.
757
758 Note that we don't enforce any corresponding restrictions for buckets,
759 because the flat namespace semantics for buckets doesn't prohibit such
760 cases the way hierarchical file systems do. For example, if a bucket
761 contains an object called gs://bucket/dir and then you run the command:
762 gsutil cp file1 file2 gs://bucket/dir
763 you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and
764 gs://bucket/dir/file2.
765
766 Args:
767 exp_src_uri: Expanded source StorageUri of copy.
768 dst_uri: Destination URI.
769
770 Raises:
771 CommandException: if errors encountered.
772 """
773 if dst_uri.is_cloud_uri():
774 # The problem can only happen for file destination URIs.
775 return
776 dst_path = dst_uri.object_name
777 final_dir = os.path.dirname(dst_path)
778 if os.path.isfile(final_dir):
779 raise CommandException('Cannot retrieve %s because a file exists '
780 'where a directory needs to be created (%s).' %
781 (exp_src_uri, final_dir))
782 if os.path.isdir(dst_path):
783 raise CommandException('Cannot retrieve %s because a directory exists '
784 '(%s) where the file needs to be created.' %
785 (exp_src_uri, dst_path))
786
787 def _InsistDstUriNamesContainer(self, exp_dst_uri,
788 have_existing_dst_container, command_name):
789 """
790 Raises an exception if URI doesn't name a directory, bucket, or bucket
791 subdir, with special exception for cp -R (see comments below).
792
793 Args:
794 exp_dst_uri: Wildcard-expanding dst_uri.
795 have_existing_dst_container: bool indicator of whether exp_dst_uri
796 names a container (directory, bucket, or existing bucket subdir).
797 command_name: Name of command making call. May not be the same as
798 self.command_name in the case of commands implemented atop other
799 commands (like mv command).
800
801 Raises:
802 CommandException: if the URI being checked does not name a container.
803 """
804 if exp_dst_uri.is_file_uri():
805 ok = exp_dst_uri.names_directory()
806 else:
807 if have_existing_dst_container:
808 ok = True
809 else:
810 # It's ok to specify a non-existing bucket subdir, for example:
811 # gsutil cp -R dir gs://bucket/abc
812 # where gs://bucket/abc isn't an existing subdir.
813 ok = exp_dst_uri.names_object()
814 if not ok:
815 raise CommandException('Destination URI must name a directory, bucket, '
816 'or bucket\nsubdirectory for the multiple '
817 'source form of the %s command.' % command_name)
818
819 class _FileCopyCallbackHandler(object):
820 """Outputs progress info for large copy requests."""
821
822 def __init__(self, upload, logger):
823 if upload:
824 self.announce_text = 'Uploading'
825 else:
826 self.announce_text = 'Downloading'
827 self.logger = logger
828
829 def call(self, total_bytes_transferred, total_size):
830 # Use sys.stderr.write instead of self.logger.info so progress messages
831 # output on a single continuously overwriting line.
832 if self.logger.isEnabledFor(logging.INFO):
833 sys.stderr.write('%s: %s/%s \r' % (
834 self.announce_text,
835 MakeHumanReadable(total_bytes_transferred),
836 MakeHumanReadable(total_size)))
837 if total_bytes_transferred == total_size:
838 sys.stderr.write('\n')
839
840 class _StreamCopyCallbackHandler(object):
841 """Outputs progress info for Stream copy to cloud.
842 Total Size of the stream is not known, so we output
843 only the bytes transferred.
844 """
845
846 def __init__(self, logger):
847 self.logger = logger
848
849 def call(self, total_bytes_transferred, total_size):
850 # Use sys.stderr.write instead of self.logger.info so progress messages
851 # output on a single continuously overwriting line.
852 if self.logger.isEnabledFor(logging.INFO):
853 sys.stderr.write('Uploading: %s \r' %
854 MakeHumanReadable(total_bytes_transferred))
855 if total_size and total_bytes_transferred == total_size:
856 sys.stderr.write('\n')
857
858 def _GetTransferHandlers(self, dst_uri, size, upload):
859 """
860 Selects upload/download and callback handlers.
861
862 We use a callback handler that shows a simple textual progress indicator
863 if size is above the configurable threshold.
864
865 We use a resumable transfer handler if size is >= the configurable
866 threshold and resumable transfers are supported by the given provider.
867 boto supports resumable downloads for all providers, but resumable
868 uploads are currently only supported by GS.
869
870 Args:
871 dst_uri: the destination URI.
872 size: size of file (object) being uploaded (downloaded).
873 upload: bool indication of whether transfer is an upload.
874 """
875 config = boto.config
876 resumable_threshold = config.getint('GSUtil', 'resumable_threshold', TWO_MB)
877 transfer_handler = None
878 cb = None
879 num_cb = None
880
881 # Checks whether the destination file is a "special" file, like /dev/null on
882 # Linux platforms or null on Windows platforms, so we can disable resumable
883 # download support since the file size of the destination won't ever be
884 # correct.
885 dst_is_special = False
886 if dst_uri.is_file_uri():
887 # Check explicitly first because os.stat doesn't work on 'nul' in Windows.
888 if dst_uri.object_name == os.devnull:
889 dst_is_special = True
890 try:
891 mode = os.stat(dst_uri.object_name).st_mode
892 if stat.S_ISCHR(mode):
893 dst_is_special = True
894 except OSError:
895 pass
896
897 if size >= resumable_threshold and not dst_is_special:
898 cb = self._FileCopyCallbackHandler(upload, self.logger).call
899 num_cb = int(size / TWO_MB)
900
901 if upload:
902 tracker_file_type = TrackerFileType.UPLOAD
903 else:
904 tracker_file_type = TrackerFileType.DOWNLOAD
905 tracker_file = self._GetTrackerFilePath(dst_uri, tracker_file_type)
906
907 if upload:
908 if dst_uri.scheme == 'gs':
909 is_secure = BOTO_IS_SECURE
910 if not is_secure[0]:
911 self.logger.info('\n'.join(textwrap.wrap(
912 'WARNING: Your boto config file (%s) has is_secure set to '
913 'False. Resumable uploads are not secure when performed with '
914 'this configuration, so large files are being uploaded with '
915 'non-resumable uploads instead.' % GetConfigFilePath())))
916 else:
917 transfer_handler = ResumableUploadHandler(tracker_file)
918 else:
919 transfer_handler = ResumableDownloadHandler(tracker_file)
920
921 return (cb, num_cb, transfer_handler)
922
923 def _GetTrackerFilePath(self, dst_uri, tracker_file_type, src_uri=None):
924 resumable_tracker_dir = CreateTrackerDirIfNeeded()
925 if tracker_file_type == TrackerFileType.UPLOAD:
926 # Encode the dest bucket and object name into the tracker file name.
927 res_tracker_file_name = (
928 re.sub('[/\\\\]', '_', 'resumable_upload__%s__%s.url' %
929 (dst_uri.bucket_name, dst_uri.object_name)))
930 tracker_file_type_str = "upload"
931 elif tracker_file_type == TrackerFileType.DOWNLOAD:
932 # Encode the fully-qualified dest file name into the tracker file name.
933 res_tracker_file_name = (
934 re.sub('[/\\\\]', '_', 'resumable_download__%s.etag' %
935 (os.path.realpath(dst_uri.object_name))))
936 tracker_file_type_str = "download"
937 elif tracker_file_type == TrackerFileType.PARALLEL_UPLOAD:
938 # Encode the dest bucket and object names as well as the source file name
939 # into the tracker file name.
940 res_tracker_file_name = (
941 re.sub('[/\\\\]', '_', 'parallel_upload__%s__%s__%s.url' %
942 (dst_uri.bucket_name, dst_uri.object_name, src_uri)))
943 tracker_file_type_str = "parallel_upload"
944
945 res_tracker_file_name = _HashFilename(res_tracker_file_name)
946 tracker_file_name = '%s_%s' % (tracker_file_type_str, res_tracker_file_name)
947 tracker_file_path = '%s%s%s' % (resumable_tracker_dir, os.sep,
948 tracker_file_name)
949 assert(len(tracker_file_name) < MAX_TRACKER_FILE_NAME_LENGTH)
950 return tracker_file_path
951
952 def _LogCopyOperation(self, src_uri, dst_uri, headers):
953 """
954 Logs copy operation being performed, including Content-Type if appropriate.
955 """
956 if 'content-type' in headers and dst_uri.is_cloud_uri():
957 content_type_msg = ' [Content-Type=%s]' % headers['content-type']
958 else:
959 content_type_msg = ''
960 if src_uri.is_stream():
961 self.logger.info('Copying from <STDIN>%s...', content_type_msg)
962 else:
963 self.logger.info('Copying %s%s...', src_uri, content_type_msg)
964
965 def _ProcessCopyObjectToObjectOptions(self, dst_uri, headers):
966 """
967 Common option processing between _CopyObjToObjInTheCloud and
968 _CopyObjToObjDaisyChainMode.
969 """
970 preserve_acl = False
971 canned_acl = None
972 if self.sub_opts:
973 for o, a in self.sub_opts:
974 if o == '-a':
975 canned_acls = dst_uri.canned_acls()
976 if a not in canned_acls:
977 raise CommandException('Invalid canned ACL "%s".' % a)
978 canned_acl = a
979 headers[dst_uri.get_provider().acl_header] = canned_acl
980 if o == '-p':
981 preserve_acl = True
982 if preserve_acl and canned_acl:
983 raise CommandException(
984 'Specifying both the -p and -a options together is invalid.')
985 return (preserve_acl, canned_acl, headers)
986
987 # We pass the headers explicitly to this call instead of using self.headers
988 # so we can set different metadata (like Content-Type type) for each object.
989 def _CopyObjToObjInTheCloud(self, src_key, src_uri, dst_uri, headers):
990 """Performs copy-in-the cloud from specified src to dest object.
991
992 Args:
993 src_key: Source Key.
994 src_uri: Source StorageUri.
995 dst_uri: Destination StorageUri.
996 headers: A copy of the top-level headers dictionary.
997
998 Returns:
999 (elapsed_time, bytes_transferred, dst_uri) excluding overhead like initial
1000 HEAD. Note: At present copy-in-the-cloud doesn't return the generation of
1001 the created object, so the returned URI is actually not version-specific
1002 (unlike other cp cases).
1003
1004 Raises:
1005 CommandException: if errors encountered.
1006 """
1007 self._SetContentTypeHeader(src_uri, headers)
1008 self._LogCopyOperation(src_uri, dst_uri, headers)
1009 # Do Object -> object copy within same provider (uses
1010 # x-<provider>-copy-source metadata HTTP header to request copying at the
1011 # server).
1012 src_bucket = src_uri.get_bucket(False, headers)
1013 (preserve_acl, canned_acl, headers) = (
1014 self._ProcessCopyObjectToObjectOptions(dst_uri, headers))
1015 start_time = time.time()
1016 # Pass headers in headers param not metadata param, so boto will copy
1017 # existing key's metadata and just set the additional headers specified
1018 # in the headers param (rather than using the headers to override existing
1019 # metadata). In particular this allows us to copy the existing key's
1020 # Content-Type and other metadata users need while still being able to
1021 # set headers the API needs (like x-goog-project-id). Note that this means
1022 # you can't do something like:
1023 # gsutil cp -t Content-Type text/html gs://bucket/* gs://bucket2
1024 # to change the Content-Type while copying.
1025 dst_key = dst_uri.copy_key(
1026 src_bucket.name, src_uri.object_name, preserve_acl=preserve_acl,
1027 headers=headers, src_version_id=src_uri.version_id,
1028 src_generation=src_uri.generation)
1029
1030 end_time = time.time()
1031 return (end_time - start_time, src_key.size,
1032 dst_uri.clone_replace_key(dst_key))
1033
1034 def _CheckFreeSpace(self, path):
1035 """Return path/drive free space (in bytes)."""
1036 if platform.system() == 'Windows':
1037 from ctypes import c_int, c_uint64, c_wchar_p, windll, POINTER, WINFUNCTYP E, WinError
1038 try:
1039 GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_wchar_p, POINTER(c_uint64),
1040 POINTER(c_uint64), POINTER(c_uint64))
1041 GetDiskFreeSpaceEx = GetDiskFreeSpaceEx(
1042 ('GetDiskFreeSpaceExW', windll.kernel32), (
1043 (1, 'lpszPathName'),
1044 (2, 'lpFreeUserSpace'),
1045 (2, 'lpTotalSpace'),
1046 (2, 'lpFreeSpace'),))
1047 except AttributeError:
1048 GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_char_p, POINTER(c_uint64),
1049 POINTER(c_uint64), POINTER(c_uint64))
1050 GetDiskFreeSpaceEx = GetDiskFreeSpaceEx(
1051 ('GetDiskFreeSpaceExA', windll.kernel32), (
1052 (1, 'lpszPathName'),
1053 (2, 'lpFreeUserSpace'),
1054 (2, 'lpTotalSpace'),
1055 (2, 'lpFreeSpace'),))
1056
1057 def GetDiskFreeSpaceEx_errcheck(result, func, args):
1058 if not result:
1059 raise WinError()
1060 return args[1].value
1061 GetDiskFreeSpaceEx.errcheck = GetDiskFreeSpaceEx_errcheck
1062
1063 return GetDiskFreeSpaceEx(os.getenv('SystemDrive'))
1064 else:
1065 (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path)
1066 return f_frsize * f_bavail
1067
1068 def _PerformResumableUploadIfApplies(self, fp, src_uri, dst_uri, canned_acl,
1069 headers, file_size, already_split=False):
1070 """
1071 Performs resumable upload if supported by provider and file is above
1072 threshold, else performs non-resumable upload.
1073
1074 Returns (elapsed_time, bytes_transferred, version-specific dst_uri).
1075 """
1076 start_time = time.time()
1077 (cb, num_cb, res_upload_handler) = self._GetTransferHandlers(
1078 dst_uri, file_size, True)
1079 if dst_uri.scheme == 'gs':
1080 # Resumable upload protocol is Google Cloud Storage-specific.
1081 dst_uri.set_contents_from_file(fp, headers, policy=canned_acl,
1082 cb=cb, num_cb=num_cb,
1083 res_upload_handler=res_upload_handler)
1084 else:
1085 dst_uri.set_contents_from_file(fp, headers, policy=canned_acl,
1086 cb=cb, num_cb=num_cb)
1087 if res_upload_handler:
1088 # ResumableUploadHandler does not update upload_start_point from its
1089 # initial value of -1 if transferring the whole file, so clamp at 0
1090 bytes_transferred = file_size - max(
1091 res_upload_handler.upload_start_point, 0)
1092 if self.use_manifest and not already_split:
1093 # Save the upload indentifier in the manifest file, unless we're
1094 # uploading a temporary component for parallel composite uploads.
1095 self.manifest.Set(
1096 src_uri, 'upload_id', res_upload_handler.get_upload_id())
1097 else:
1098 bytes_transferred = file_size
1099 end_time = time.time()
1100 return (end_time - start_time, bytes_transferred, dst_uri)
1101
1102 def _PerformStreamingUpload(self, fp, dst_uri, headers, canned_acl=None):
1103 """
1104 Performs a streaming upload to the cloud.
1105
1106 Args:
1107 fp: The file whose contents to upload.
1108 dst_uri: Destination StorageUri.
1109 headers: A copy of the top-level headers dictionary.
1110 canned_acl: Optional canned ACL to set on the object.
1111
1112 Returns (elapsed_time, bytes_transferred, version-specific dst_uri).
1113 """
1114 start_time = time.time()
1115
1116 cb = self._StreamCopyCallbackHandler(self.logger).call
1117 dst_uri.set_contents_from_stream(
1118 fp, headers, policy=canned_acl, cb=cb)
1119 try:
1120 bytes_transferred = fp.tell()
1121 except:
1122 bytes_transferred = 0
1123
1124 end_time = time.time()
1125 return (end_time - start_time, bytes_transferred, dst_uri)
1126
1127 def _SetContentTypeHeader(self, src_uri, headers):
1128 """
1129 Sets content type header to value specified in '-h Content-Type' option (if
1130 specified); else sets using Content-Type detection.
1131 """
1132 if 'content-type' in headers:
1133 # If empty string specified (i.e., -h "Content-Type:") set header to None,
1134 # which will inhibit boto from sending the CT header. Otherwise, boto will
1135 # pass through the user specified CT header.
1136 if not headers['content-type']:
1137 headers['content-type'] = None
1138 # else we'll keep the value passed in via -h option (not performing
1139 # content type detection).
1140 else:
1141 # Only do content type recognition is src_uri is a file. Object-to-object
1142 # copies with no -h Content-Type specified re-use the content type of the
1143 # source object.
1144 if src_uri.is_file_uri():
1145 object_name = src_uri.object_name
1146 content_type = None
1147 # Streams (denoted by '-') are expected to be 'application/octet-stream'
1148 # and 'file' would partially consume them.
1149 if object_name != '-':
1150 if self.USE_MAGICFILE:
1151 p = subprocess.Popen(['file', '--mime-type', object_name],
1152 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1153 output, error = p.communicate()
1154 if p.returncode != 0 or error:
1155 raise CommandException(
1156 'Encountered error running "file --mime-type %s" '
1157 '(returncode=%d).\n%s' % (object_name, p.returncode, error))
1158 # Parse output by removing line delimiter and splitting on last ":
1159 content_type = output.rstrip().rpartition(': ')[2]
1160 else:
1161 content_type = mimetypes.guess_type(object_name)[0]
1162 if not content_type:
1163 content_type = self.DEFAULT_CONTENT_TYPE
1164 headers['content-type'] = content_type
1165
1166 def _GetFileSize(self, fp):
1167 """Determines file size different ways for case where fp is actually a
1168 wrapper around a Key vs an actual file.
1169
1170 Args:
1171 The file whose size we wish to determine.
1172
1173 Returns:
1174 The size of the file, in bytes.
1175 """
1176 if isinstance(fp, KeyFile):
1177 return fp.getkey().size
1178 else:
1179 return os.path.getsize(fp.name)
1180
1181 def _UploadFileToObject(self, src_key, src_uri, dst_uri, headers,
1182 should_log=True, allow_splitting=True):
1183 """Uploads a local file to an object.
1184
1185 Args:
1186 src_key: Source StorageUri. Must be a file URI.
1187 src_uri: Source StorageUri.
1188 dst_uri: Destination StorageUri.
1189 headers: The headers dictionary.
1190 should_log: bool indicator whether we should log this operation.
1191 Returns:
1192 (elapsed_time, bytes_transferred, version-specific dst_uri), excluding
1193 overhead like initial HEAD.
1194
1195 Raises:
1196 CommandException: if errors encountered.
1197 """
1198 gzip_exts = []
1199 canned_acl = None
1200 if self.sub_opts:
1201 for o, a in self.sub_opts:
1202 if o == '-a':
1203 canned_acls = dst_uri.canned_acls()
1204 if a not in canned_acls:
1205 raise CommandException('Invalid canned ACL "%s".' % a)
1206 canned_acl = a
1207 elif o == '-t':
1208 self.logger.warning(
1209 'Warning: -t is deprecated, and will be removed in the future. '
1210 'Content type\ndetection is '
1211 'now performed by default, unless inhibited by specifying '
1212 'a\nContent-Type header via the -h option.')
1213 elif o == '-z':
1214 gzip_exts = a.split(',')
1215
1216 self._SetContentTypeHeader(src_uri, headers)
1217 if should_log:
1218 self._LogCopyOperation(src_uri, dst_uri, headers)
1219
1220 if 'content-language' not in headers:
1221 content_language = config.get_value('GSUtil', 'content_language')
1222 if content_language:
1223 headers['content-language'] = content_language
1224
1225 fname_parts = src_uri.object_name.split('.')
1226 if len(fname_parts) > 1 and fname_parts[-1] in gzip_exts:
1227 self.logger.debug('Compressing %s (to tmp)...', src_key)
1228 (gzip_fh, gzip_path) = tempfile.mkstemp()
1229 gzip_fp = None
1230 try:
1231 # Check for temp space. Assume the compressed object is at most 2x
1232 # the size of the object (normally should compress to smaller than
1233 # the object)
1234 if (self._CheckFreeSpace(gzip_path)
1235 < 2*int(os.path.getsize(src_key.name))):
1236 raise CommandException('Inadequate temp space available to compress '
1237 '%s' % src_key.name)
1238 gzip_fp = gzip.open(gzip_path, 'wb')
1239 gzip_fp.writelines(src_key.fp)
1240 finally:
1241 if gzip_fp:
1242 gzip_fp.close()
1243 os.close(gzip_fh)
1244 headers['content-encoding'] = 'gzip'
1245 gzip_fp = open(gzip_path, 'rb')
1246 try:
1247 file_size = self._GetFileSize(gzip_fp)
1248 (elapsed_time, bytes_transferred, result_uri) = (
1249 self._PerformResumableUploadIfApplies(gzip_fp, src_uri, dst_uri,
1250 canned_acl, headers,
1251 file_size))
1252 finally:
1253 gzip_fp.close()
1254 try:
1255 os.unlink(gzip_path)
1256 # Windows sometimes complains the temp file is locked when you try to
1257 # delete it.
1258 except Exception, e:
1259 pass
1260 elif (src_key.is_stream()
1261 and dst_uri.get_provider().supports_chunked_transfer()):
1262 (elapsed_time, bytes_transferred, result_uri) = (
1263 self._PerformStreamingUpload(src_key.fp, dst_uri, headers,
1264 canned_acl))
1265 else:
1266 if src_key.is_stream():
1267 # For Providers that don't support chunked Transfers
1268 tmp = tempfile.NamedTemporaryFile()
1269 file_uri = self.suri_builder.StorageUri('file://%s' % tmp.name)
1270 try:
1271 file_uri.new_key(False, headers).set_contents_from_file(
1272 src_key.fp, headers)
1273 src_key = file_uri.get_key()
1274 finally:
1275 file_uri.close()
1276 try:
1277 fp = src_key.fp
1278 file_size = self._GetFileSize(fp)
1279 if self._ShouldDoParallelCompositeUpload(allow_splitting, src_key,
1280 dst_uri, file_size):
1281 (elapsed_time, bytes_transferred, result_uri) = (
1282 self._DoParallelCompositeUpload(fp, src_uri, dst_uri, headers,
1283 canned_acl, file_size))
1284 else:
1285 (elapsed_time, bytes_transferred, result_uri) = (
1286 self._PerformResumableUploadIfApplies(
1287 src_key.fp, src_uri, dst_uri, canned_acl, headers,
1288 self._GetFileSize(src_key.fp)))
1289 finally:
1290 if src_key.is_stream():
1291 tmp.close()
1292 else:
1293 src_key.close()
1294 return (elapsed_time, bytes_transferred, result_uri)
1295
1296 def _GetHashAlgs(self, key):
1297 hash_algs = {}
1298 check_hashes_config = config.get(
1299 'GSUtil', 'check_hashes', 'if_fast_else_fail')
1300 if check_hashes_config == 'never':
1301 return hash_algs
1302 if self._GetMD5FromETag(key):
1303 hash_algs['md5'] = md5
1304 if hasattr(key, 'cloud_hashes') and key.cloud_hashes:
1305 if 'md5' in key.cloud_hashes:
1306 hash_algs['md5'] = md5
1307 # If the cloud provider supplies a CRC, we'll compute a checksum to
1308 # validate if we're using a native crcmod installation or MD5 isn't
1309 # offered as an alternative.
1310 if 'crc32c' in key.cloud_hashes:
1311 if UsingCrcmodExtension(crcmod):
1312 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
1313 elif not hash_algs:
1314 if check_hashes_config == 'if_fast_else_fail':
1315 raise SLOW_CRC_EXCEPTION
1316 elif check_hashes_config == 'if_fast_else_skip':
1317 sys.stderr.write(NO_HASH_CHECK_WARNING)
1318 elif check_hashes_config == 'always':
1319 sys.stderr.write(SLOW_CRC_WARNING)
1320 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
1321 else:
1322 raise CommandException(
1323 'Your boto config \'check_hashes\' option is misconfigured.')
1324 elif not hash_algs:
1325 if check_hashes_config == 'if_fast_else_skip':
1326 sys.stderr.write(NO_SERVER_HASH_WARNING)
1327 else:
1328 raise NO_SERVER_HASH_EXCEPTION
1329 return hash_algs
1330
1331 def _DownloadObjectToFile(self, src_key, src_uri, dst_uri, headers,
1332 should_log=True):
1333 """Downloads an object to a local file.
1334
1335 Args:
1336 src_key: Source Key.
1337 src_uri: Source StorageUri.
1338 dst_uri: Destination StorageUri.
1339 headers: The headers dictionary.
1340 should_log: bool indicator whether we should log this operation.
1341 Returns:
1342 (elapsed_time, bytes_transferred, dst_uri), excluding overhead like
1343 initial HEAD.
1344
1345 Raises:
1346 CommandException: if errors encountered.
1347 """
1348 if should_log:
1349 self._LogCopyOperation(src_uri, dst_uri, headers)
1350 (cb, num_cb, res_download_handler) = self._GetTransferHandlers(
1351 dst_uri, src_key.size, False)
1352 file_name = dst_uri.object_name
1353 dir_name = os.path.dirname(file_name)
1354 if dir_name and not os.path.exists(dir_name):
1355 # Do dir creation in try block so can ignore case where dir already
1356 # exists. This is needed to avoid a race condition when running gsutil
1357 # -m cp.
1358 try:
1359 os.makedirs(dir_name)
1360 except OSError, e:
1361 if e.errno != errno.EEXIST:
1362 raise
1363 # For gzipped objects, download to a temp file and unzip.
1364 if (hasattr(src_key, 'content_encoding')
1365 and src_key.content_encoding == 'gzip'):
1366 # We can't use tempfile.mkstemp() here because we need a predictable
1367 # filename for resumable downloads.
1368 download_file_name = '%s_.gztmp' % file_name
1369 need_to_unzip = True
1370 else:
1371 download_file_name = file_name
1372 need_to_unzip = False
1373
1374 hash_algs = self._GetHashAlgs(src_key)
1375
1376 # Add accept encoding for download operation.
1377 AddAcceptEncoding(headers)
1378
1379 fp = None
1380 try:
1381 if res_download_handler:
1382 fp = open(download_file_name, 'ab')
1383 else:
1384 fp = open(download_file_name, 'wb')
1385 start_time = time.time()
1386 # Use our hash_algs if get_contents_to_file() will accept them, else the
1387 # default (md5-only) will suffice.
1388 try:
1389 src_key.get_contents_to_file(fp, headers, cb=cb, num_cb=num_cb,
1390 res_download_handler=res_download_handler,
1391 hash_algs=hash_algs)
1392 except TypeError:
1393 src_key.get_contents_to_file(fp, headers, cb=cb, num_cb=num_cb,
1394 res_download_handler=res_download_handler)
1395
1396 # If a custom test method is defined, call it here. For the copy command,
1397 # test methods are expected to take one argument: an open file pointer,
1398 # and are used to perturb the open file during download to exercise
1399 # download error detection.
1400 if self.test_method:
1401 self.test_method(fp)
1402 end_time = time.time()
1403 finally:
1404 if fp:
1405 fp.close()
1406
1407 if (not need_to_unzip and
1408 hasattr(src_key, 'content_encoding')
1409 and src_key.content_encoding == 'gzip'):
1410 # TODO: HEAD requests are currently not returning proper Content-Encoding
1411 # headers when an object is gzip-encoded on-the-fly. Remove this once
1412 # it's fixed.
1413 renamed_file_name = '%s_.gztmp' % file_name
1414 os.rename(download_file_name, renamed_file_name)
1415 download_file_name = renamed_file_name
1416 need_to_unzip = True
1417
1418 # Discard all hashes if we are resuming a partial download.
1419 if res_download_handler and res_download_handler.download_start_point:
1420 src_key.local_hashes = {}
1421
1422 # Verify downloaded file checksum matched source object's checksum.
1423 digest_verified = True
1424 computed_hashes = None
1425 try:
1426 self._CheckHashes(src_key, download_file_name, hash_algs)
1427 except CommandException, e:
1428 # If the digest doesn't match, we'll try checking it again after
1429 # unzipping.
1430 if (not need_to_unzip or
1431 'doesn\'t match cloud-supplied digest' not in str(e)):
1432 os.unlink(download_file_name)
1433 raise
1434 digest_verified = False
1435 computed_hashes = dict(
1436 (alg, digester())
1437 for alg, digester in self._GetHashAlgs(src_key).iteritems())
1438
1439 if res_download_handler:
1440 bytes_transferred = (
1441 src_key.size - res_download_handler.download_start_point)
1442 else:
1443 bytes_transferred = src_key.size
1444
1445 if need_to_unzip:
1446 # Log that we're uncompressing if the file is big enough that
1447 # decompressing would make it look like the transfer "stalled" at the end.
1448 if bytes_transferred > 10 * 1024 * 1024:
1449 self.logger.info('Uncompressing downloaded tmp file to %s...',
1450 file_name)
1451
1452 # Downloaded gzipped file to a filename w/o .gz extension, so unzip.
1453 f_in = gzip.open(download_file_name, 'rb')
1454 with open(file_name, 'wb') as f_out:
1455 data = f_in.read(self.GUNZIP_CHUNK_SIZE)
1456 while data:
1457 f_out.write(data)
1458 if computed_hashes:
1459 # Compute digests again on the uncompressed data.
1460 for alg in computed_hashes.itervalues():
1461 alg.update(data)
1462 data = f_in.read(self.GUNZIP_CHUNK_SIZE)
1463 f_in.close()
1464
1465 os.unlink(download_file_name)
1466
1467 if not digest_verified:
1468 computed_hashes = dict((alg, digester.digest())
1469 for alg, digester in computed_hashes.iteritems())
1470 try:
1471 self._CheckHashes(
1472 src_key, file_name, hash_algs, computed_hashes=computed_hashes)
1473 except CommandException, e:
1474 os.unlink(file_name)
1475 raise
1476
1477 return (end_time - start_time, bytes_transferred, dst_uri)
1478
1479 def _PerformDownloadToStream(self, src_key, src_uri, str_fp, headers):
1480 (cb, num_cb, res_download_handler) = self._GetTransferHandlers(
1481 src_uri, src_key.size, False)
1482 start_time = time.time()
1483 src_key.get_contents_to_file(str_fp, headers, cb=cb, num_cb=num_cb)
1484 end_time = time.time()
1485 bytes_transferred = src_key.size
1486 end_time = time.time()
1487 return (end_time - start_time, bytes_transferred)
1488
1489 def _CopyFileToFile(self, src_key, src_uri, dst_uri, headers):
1490 """Copies a local file to a local file.
1491
1492 Args:
1493 src_key: Source StorageUri. Must be a file URI.
1494 src_uri: Source StorageUri.
1495 dst_uri: Destination StorageUri.
1496 headers: The headers dictionary.
1497 Returns:
1498 (elapsed_time, bytes_transferred, dst_uri), excluding
1499 overhead like initial HEAD.
1500
1501 Raises:
1502 CommandException: if errors encountered.
1503 """
1504 self._LogCopyOperation(src_uri, dst_uri, headers)
1505 dst_key = dst_uri.new_key(False, headers)
1506 start_time = time.time()
1507 dst_key.set_contents_from_file(src_key.fp, headers)
1508 end_time = time.time()
1509 return (end_time - start_time, os.path.getsize(src_key.fp.name), dst_uri)
1510
1511 def _CopyObjToObjDaisyChainMode(self, src_key, src_uri, dst_uri, headers):
1512 """Copies from src_uri to dst_uri in "daisy chain" mode.
1513 See -D OPTION documentation about what daisy chain mode is.
1514
1515 Args:
1516 src_key: Source Key.
1517 src_uri: Source StorageUri.
1518 dst_uri: Destination StorageUri.
1519 headers: A copy of the top-level headers dictionary.
1520
1521 Returns:
1522 (elapsed_time, bytes_transferred, version-specific dst_uri) excluding
1523 overhead like initial HEAD.
1524
1525 Raises:
1526 CommandException: if errors encountered.
1527 """
1528 # Start with copy of input headers, so we'll include any headers that need
1529 # to be set from higher up in call stack (like x-goog-if-generation-match).
1530 headers = headers.copy()
1531 # Now merge headers from src_key so we'll preserve metadata.
1532 # Unfortunately boto separates headers into ones it puts in the metadata
1533 # dict and ones it pulls out into specific key fields, so we need to walk
1534 # through the latter list to find the headers that we copy over to the dest
1535 # object.
1536 for header_name, field_name in (
1537 ('cache-control', 'cache_control'),
1538 ('content-type', 'content_type'),
1539 ('content-language', 'content_language'),
1540 ('content-encoding', 'content_encoding'),
1541 ('content-disposition', 'content_disposition')):
1542 value = getattr(src_key, field_name, None)
1543 if value:
1544 headers[header_name] = value
1545 # Boto represents x-goog-meta-* headers in metadata dict with the
1546 # x-goog-meta- or x-amx-meta- prefix stripped. Turn these back into headers
1547 # for the destination object.
1548 for name, value in src_key.metadata.items():
1549 header_name = '%smeta-%s' % (dst_uri.get_provider().header_prefix, name)
1550 headers[header_name] = value
1551 # Set content type if specified in '-h Content-Type' option.
1552 self._SetContentTypeHeader(src_uri, headers)
1553 self._LogCopyOperation(src_uri, dst_uri, headers)
1554 (preserve_acl, canned_acl, headers) = (
1555 self._ProcessCopyObjectToObjectOptions(dst_uri, headers))
1556 if preserve_acl:
1557 if src_uri.get_provider() != dst_uri.get_provider():
1558 # We don't attempt to preserve ACLs across providers because
1559 # GCS and S3 support different ACLs and disjoint principals.
1560 raise NotImplementedError('Cross-provider cp -p not supported')
1561 # We need to read and write the ACL manually because the
1562 # Key.set_contents_from_file() API doesn't provide a preserve_acl
1563 # parameter (unlike the Bucket.copy_key() API used
1564 # by_CopyObjToObjInTheCloud).
1565 acl = src_uri.get_acl(headers=headers)
1566 fp = KeyFile(src_key)
1567 result = self._PerformResumableUploadIfApplies(fp, src_uri,
1568 dst_uri, canned_acl, headers,
1569 self._GetFileSize(fp))
1570 if preserve_acl:
1571 # If user specified noclobber flag, we need to remove the
1572 # x-goog-if-generation-match:0 header that was set when uploading the
1573 # object, because that precondition would fail when updating the ACL on
1574 # the now-existing object.
1575 if self.no_clobber:
1576 del headers['x-goog-if-generation-match']
1577 # Remove the owner field from the ACL in case we're copying from an object
1578 # that is owned by a different user. If we left that other user in the
1579 # ACL, attempting to set the ACL would result in a 400 (Bad Request).
1580 if hasattr(acl, 'owner'):
1581 del acl.owner
1582 dst_uri.set_acl(acl, dst_uri.object_name, headers=headers)
1583 return result
1584
1585 def _PerformCopy(self, src_uri, dst_uri, allow_splitting=True):
1586 """Performs copy from src_uri to dst_uri, handling various special cases.
1587
1588 Args:
1589 src_uri: Source StorageUri.
1590 dst_uri: Destination StorageUri.
1591 allow_splitting: Whether to allow the file to be split into component
1592 pieces for an parallel composite upload.
1593
1594 Returns:
1595 (elapsed_time, bytes_transferred, version-specific dst_uri) excluding
1596 overhead like initial HEAD.
1597
1598 Raises:
1599 CommandException: if errors encountered.
1600 """
1601 # Make a copy of the input headers each time so we can set a different
1602 # content type for each object.
1603 headers = self.headers.copy() if self.headers else {}
1604 download_headers = headers.copy()
1605 # Add accept encoding for download operation.
1606 AddAcceptEncoding(download_headers)
1607
1608 src_key = src_uri.get_key(False, download_headers)
1609 if not src_key:
1610 raise CommandException('"%s" does not exist.' % src_uri)
1611
1612 if self.use_manifest:
1613 # Set the source size in the manifest.
1614 self.manifest.Set(src_uri, 'size', getattr(src_key, 'size', None))
1615
1616 # On Windows, stdin is opened as text mode instead of binary which causes
1617 # problems when piping a binary file, so this switches it to binary mode.
1618 if IS_WINDOWS and src_uri.is_file_uri() and src_key.is_stream():
1619 import msvcrt
1620 msvcrt.setmode(src_key.fp.fileno(), os.O_BINARY)
1621
1622 if self.no_clobber:
1623 # There are two checks to prevent clobbering:
1624 # 1) The first check is to see if the item
1625 # already exists at the destination and prevent the upload/download
1626 # from happening. This is done by the exists() call.
1627 # 2) The second check is only relevant if we are writing to gs. We can
1628 # enforce that the server only writes the object if it doesn't exist
1629 # by specifying the header below. This check only happens at the
1630 # server after the complete file has been uploaded. We specify this
1631 # header to prevent a race condition where a destination file may
1632 # be created after the first check and before the file is fully
1633 # uploaded.
1634 # In order to save on unnecessary uploads/downloads we perform both
1635 # checks. However, this may come at the cost of additional HTTP calls.
1636 if dst_uri.exists(download_headers):
1637 if dst_uri.is_file_uri():
1638 # The local file may be a partial. Check the file sizes.
1639 if src_key.size == dst_uri.get_key(headers=download_headers).size:
1640 raise ItemExistsError()
1641 else:
1642 raise ItemExistsError()
1643 if dst_uri.is_cloud_uri() and dst_uri.scheme == 'gs':
1644 headers['x-goog-if-generation-match'] = '0'
1645
1646 if src_uri.is_cloud_uri() and dst_uri.is_cloud_uri():
1647 if src_uri.scheme == dst_uri.scheme and not self.daisy_chain:
1648 return self._CopyObjToObjInTheCloud(src_key, src_uri, dst_uri, headers)
1649 else:
1650 return self._CopyObjToObjDaisyChainMode(src_key, src_uri, dst_uri,
1651 headers)
1652 elif src_uri.is_file_uri() and dst_uri.is_cloud_uri():
1653 return self._UploadFileToObject(src_key, src_uri, dst_uri,
1654 download_headers)
1655 elif src_uri.is_cloud_uri() and dst_uri.is_file_uri():
1656 return self._DownloadObjectToFile(src_key, src_uri, dst_uri,
1657 download_headers)
1658 elif src_uri.is_file_uri() and dst_uri.is_file_uri():
1659 return self._CopyFileToFile(src_key, src_uri, dst_uri, download_headers)
1660 else:
1661 raise CommandException('Unexpected src/dest case')
1662
1663 def _PartitionFile(self, fp, file_size, src_uri, headers, canned_acl, bucket,
1664 random_prefix, tracker_file, tracker_file_lock):
1665 """Partitions a file into FilePart objects to be uploaded and later composed
1666 into an object matching the original file. This entails splitting the
1667 file into parts, naming and forming a destination URI for each part,
1668 and also providing the PerformResumableUploadIfAppliesArgs object
1669 corresponding to each part.
1670
1671 Args:
1672 fp: The file object to be partitioned.
1673 file_size: The size of fp, in bytes.
1674 src_uri: The source StorageUri fromed from the original command.
1675 headers: The headers which ultimately passed to boto.
1676 canned_acl: The user-provided canned_acl, if applicable.
1677 bucket: The name of the destination bucket, of the form gs://bucket
1678 random_prefix: The randomly-generated prefix used to prevent collisions
1679 among the temporary component names.
1680 tracker_file: The path to the parallel composite upload tracker file.
1681 tracker_file_lock: The lock protecting access to the tracker file.
1682
1683 Returns:
1684 dst_args: The destination URIs for the temporary component objects.
1685 """
1686 parallel_composite_upload_component_size = HumanReadableToBytes(
1687 boto.config.get('GSUtil', 'parallel_composite_upload_component_size',
1688 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE))
1689 (num_components, component_size) = _GetPartitionInfo(file_size,
1690 MAX_COMPOSE_ARITY, parallel_composite_upload_component_size)
1691
1692 # Make sure that the temporary objects don't already exist.
1693 tmp_object_headers = copy.deepcopy(headers)
1694 tmp_object_headers['x-goog-if-generation-match'] = '0'
1695
1696 uri_strs = [] # Used to create a NameExpansionIterator.
1697 dst_args = {} # Arguments to create commands and pass to subprocesses.
1698 file_names = [] # Used for the 2-step process of forming dst_args.
1699 for i in range(num_components):
1700 # "Salt" the object name with something a user is very unlikely to have
1701 # used in an object name, then hash the extended name to make sure
1702 # we don't run into problems with name length. Using a deterministic
1703 # naming scheme for the temporary components allows users to take
1704 # advantage of resumable uploads for each component.
1705 encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode('utf-8')
1706 content_md5 = md5()
1707 content_md5.update(encoded_name)
1708 digest = content_md5.hexdigest()
1709 temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE +
1710 digest + '_' + str(i))
1711 tmp_dst_uri = MakeGsUri(bucket, temp_file_name, self.suri_builder)
1712
1713 if i < (num_components - 1):
1714 # Every component except possibly the last is the same size.
1715 file_part_length = component_size
1716 else:
1717 # The last component just gets all of the remaining bytes.
1718 file_part_length = (file_size - ((num_components -1) * component_size))
1719 offset = i * component_size
1720 func_args = PerformResumableUploadIfAppliesArgs(
1721 fp.name, offset, file_part_length, src_uri, tmp_dst_uri, canned_acl,
1722 headers, tracker_file, tracker_file_lock)
1723 file_names.append(temp_file_name)
1724 dst_args[temp_file_name] = func_args
1725 uri_strs.append(self._MakeGsUriStr(bucket, temp_file_name))
1726
1727 return dst_args
1728
1729 def _MakeFileUri(self, filename):
1730 """Returns a StorageUri for a local file."""
1731 return self.suri_builder.StorageUri(filename)
1732
1733 def _MakeGsUriStr(self, bucket, filename):
1734 """Returns a string of the form gs://bucket/filename, used to indicate an
1735 object in Google Cloud Storage.
1736 """
1737 return 'gs://' + bucket + '/' + filename
1738
1739 def _DoParallelCompositeUpload(self, fp, src_uri, dst_uri, headers,
1740 canned_acl, file_size):
1741 """Uploads a local file to an object in the cloud for the Parallel Composite
1742 Uploads feature. The file is partitioned into parts, and then the parts
1743 are uploaded in parallel, composed to form the original destination
1744 object, and deleted.
1745
1746 Args:
1747 fp: The file object to be uploaded.
1748 src_uri: The StorageURI of the local file.
1749 dst_uri: The StorageURI of the destination file.
1750 headers: The headers to pass to boto, if any.
1751 canned_acl: The canned acl to apply to the object, if any.
1752 file_size: The size of the source file in bytes.
1753 """
1754 start_time = time.time()
1755 gs_prefix = 'gs://'
1756 bucket = gs_prefix + dst_uri.bucket_name
1757 if 'content-type' in headers and not headers['content-type']:
1758 del headers['content-type']
1759
1760 # Determine which components, if any, have already been successfully
1761 # uploaded.
1762 tracker_file = self._GetTrackerFilePath(dst_uri,
1763 TrackerFileType.PARALLEL_UPLOAD,
1764 src_uri)
1765 tracker_file_lock = CreateLock()
1766 (random_prefix, existing_components) = (
1767 _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock))
1768
1769 # Create the initial tracker file for the upload.
1770 _CreateParallelUploadTrackerFile(tracker_file, random_prefix,
1771 existing_components, tracker_file_lock)
1772
1773 # Get the set of all components that should be uploaded.
1774 dst_args = self._PartitionFile(fp, file_size, src_uri, headers, canned_acl,
1775 bucket, random_prefix, tracker_file,
1776 tracker_file_lock)
1777
1778 (components_to_upload, existing_components, existing_objects_to_delete) = (
1779 FilterExistingComponents(dst_args, existing_components, bucket,
1780 self.suri_builder))
1781
1782 # In parallel, copy all of the file parts that haven't already been
1783 # uploaded to temporary objects.
1784 cp_results = self.Apply(_PerformResumableUploadIfAppliesWrapper,
1785 components_to_upload,
1786 _CopyExceptionHandler,
1787 ('copy_failure_count', 'total_bytes_transferred'),
1788 arg_checker=gslib.command.DummyArgChecker,
1789 parallel_operations_override=True,
1790 should_return_results=True)
1791 uploaded_components = []
1792 total_bytes_uploaded = 0
1793 for cp_result in cp_results:
1794 total_bytes_uploaded += cp_result[1]
1795 uploaded_components.append(cp_result[2])
1796 components = uploaded_components + existing_components
1797
1798 if len(components) == len(dst_args):
1799 # Only try to compose if all of the components were uploaded successfully.
1800
1801 # Sort the components so that they will be composed in the correct order.
1802 components = sorted(
1803 components, key=lambda component:
1804 int(component.object_name[component.object_name.rfind('_')+1:]))
1805 result_uri = dst_uri.compose(components, headers=headers)
1806
1807 try:
1808 # Make sure only to delete things that we know were successfully
1809 # uploaded (as opposed to all of the objects that we attempted to
1810 # create) so that we don't delete any preexisting objects, except for
1811 # those that were uploaded by a previous, failed run and have since
1812 # changed (but still have an old generation lying around).
1813 objects_to_delete = components + existing_objects_to_delete
1814 self.Apply(_DeleteKeyFn, objects_to_delete, _RmExceptionHandler,
1815 arg_checker=gslib.command.DummyArgChecker,
1816 parallel_operations_override=True)
1817 except Exception, e:
1818 if (e.message and ('unexpected failure in' in e.message)
1819 and ('sub-processes, aborting' in e.message)):
1820 # If some of the delete calls fail, don't cause the whole command to
1821 # fail. The copy was successful iff the compose call succeeded, so
1822 # just raise whatever exception (if any) happened before this instead,
1823 # and reduce this to a warning.
1824 logging.warning(
1825 'Failed to delete some of the following temporary objects:\n' +
1826 '\n'.join(dst_args.keys()))
1827 else:
1828 raise e
1829 finally:
1830 with tracker_file_lock:
1831 if os.path.exists(tracker_file):
1832 os.unlink(tracker_file)
1833 else:
1834 # Some of the components failed to upload. In this case, we want to exit
1835 # without deleting the objects.
1836 raise CommandException(
1837 'Some temporary components were not uploaded successfully. '
1838 'Please retry this upload.')
1839
1840 return (time.time() - start_time, total_bytes_uploaded, result_uri)
1841
1842 def _ShouldDoParallelCompositeUpload(self, allow_splitting, src_key, dst_uri,
1843 file_size):
1844 """Returns True iff a parallel upload should be performed on the source key.
1845
1846 Args:
1847 allow_splitting: If false, then this function returns false.
1848 src_key: Corresponding to a local file.
1849 dst_uri: Corresponding to an object in the cloud.
1850 file_size: The size of the source file, in bytes.
1851 """
1852 parallel_composite_upload_threshold = HumanReadableToBytes(boto.config.get(
1853 'GSUtil', 'parallel_composite_upload_threshold',
1854 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD))
1855 return (allow_splitting # Don't split the pieces multiple times.
1856 and not src_key.is_stream() # We can't partition streams.
1857 and dst_uri.scheme == 'gs' # Compose is only for gs.
1858 and parallel_composite_upload_threshold > 0
1859 and file_size >= parallel_composite_upload_threshold
1860 and file_size >= MIN_PARALLEL_COMPOSITE_FILE_SIZE)
1861
1862 def _ExpandDstUri(self, dst_uri_str):
1863 """
1864 Expands wildcard if present in dst_uri_str.
1865
1866 Args:
1867 dst_uri_str: String representation of requested dst_uri.
1868
1869 Returns:
1870 (exp_dst_uri, have_existing_dst_container)
1871 where have_existing_dst_container is a bool indicating whether
1872 exp_dst_uri names an existing directory, bucket, or bucket subdirectory.
1873
1874 Raises:
1875 CommandException: if dst_uri_str matched more than 1 URI.
1876 """
1877 dst_uri = self.suri_builder.StorageUri(dst_uri_str)
1878
1879 # Handle wildcarded dst_uri case.
1880 if ContainsWildcard(dst_uri):
1881 blr_expansion = list(self.WildcardIterator(dst_uri))
1882 if len(blr_expansion) != 1:
1883 raise CommandException('Destination (%s) must match exactly 1 URI' %
1884 dst_uri_str)
1885 blr = blr_expansion[0]
1886 uri = blr.GetUri()
1887 if uri.is_cloud_uri():
1888 return (uri, uri.names_bucket() or blr.HasPrefix()
1889 or blr.GetKey().name.endswith('/'))
1890 else:
1891 return (uri, uri.names_directory())
1892
1893 # Handle non-wildcarded dst_uri:
1894 if dst_uri.is_file_uri():
1895 return (dst_uri, dst_uri.names_directory())
1896 if dst_uri.names_bucket():
1897 return (dst_uri, True)
1898 # For object URIs check 3 cases: (a) if the name ends with '/' treat as a
1899 # subdir; else, perform a wildcard expansion with dst_uri + "*" and then
1900 # find if (b) there's a Prefix matching dst_uri, or (c) name is of form
1901 # dir_$folder$ (and in both these cases also treat dir as a subdir).
1902 if dst_uri.is_cloud_uri() and dst_uri_str.endswith('/'):
1903 return (dst_uri, True)
1904 blr_expansion = list(self.WildcardIterator(
1905 '%s*' % dst_uri_str.rstrip(dst_uri.delim)))
1906 for blr in blr_expansion:
1907 if blr.GetRStrippedUriString().endswith('_$folder$'):
1908 return (dst_uri, True)
1909 if blr.GetRStrippedUriString() == dst_uri_str.rstrip(dst_uri.delim):
1910 return (dst_uri, blr.HasPrefix())
1911 return (dst_uri, False)
1912
1913 def _ConstructDstUri(self, src_uri, exp_src_uri,
1914 src_uri_names_container, src_uri_expands_to_multi,
1915 have_multiple_srcs, exp_dst_uri,
1916 have_existing_dest_subdir):
1917 """
1918 Constructs the destination URI for a given exp_src_uri/exp_dst_uri pair,
1919 using context-dependent naming rules that mimic Linux cp and mv behavior.
1920
1921 Args:
1922 src_uri: src_uri to be copied.
1923 exp_src_uri: Single StorageUri from wildcard expansion of src_uri.
1924 src_uri_names_container: True if src_uri names a container (including the
1925 case of a wildcard-named bucket subdir (like gs://bucket/abc,
1926 where gs://bucket/abc/* matched some objects). Note that this is
1927 additional semantics tha src_uri.names_container() doesn't understand
1928 because the latter only understands StorageUris, not wildcards.
1929 src_uri_expands_to_multi: True if src_uri expanded to multiple URIs.
1930 have_multiple_srcs: True if this is a multi-source request. This can be
1931 true if src_uri wildcard-expanded to multiple URIs or if there were
1932 multiple source URIs in the request.
1933 exp_dst_uri: the expanded StorageUri requested for the cp destination.
1934 Final written path is constructed from this plus a context-dependent
1935 variant of src_uri.
1936 have_existing_dest_subdir: bool indicator whether dest is an existing
1937 subdirectory.
1938
1939 Returns:
1940 StorageUri to use for copy.
1941
1942 Raises:
1943 CommandException if destination object name not specified for
1944 source and source is a stream.
1945 """
1946 if self._ShouldTreatDstUriAsSingleton(
1947 have_multiple_srcs, have_existing_dest_subdir, exp_dst_uri):
1948 # We're copying one file or object to one file or object.
1949 return exp_dst_uri
1950
1951 if exp_src_uri.is_stream():
1952 if exp_dst_uri.names_container():
1953 raise CommandException('Destination object name needed when '
1954 'source is a stream')
1955 return exp_dst_uri
1956
1957 if not self.recursion_requested and not have_multiple_srcs:
1958 # We're copying one file or object to a subdirectory. Append final comp
1959 # of exp_src_uri to exp_dst_uri.
1960 src_final_comp = exp_src_uri.object_name.rpartition(src_uri.delim)[-1]
1961 return self.suri_builder.StorageUri('%s%s%s' % (
1962 exp_dst_uri.uri.rstrip(exp_dst_uri.delim), exp_dst_uri.delim,
1963 src_final_comp))
1964
1965 # Else we're copying multiple sources to a directory, bucket, or a bucket
1966 # "sub-directory".
1967
1968 # Ensure exp_dst_uri ends in delim char if we're doing a multi-src copy or
1969 # a copy to a directory. (The check for copying to a directory needs
1970 # special-case handling so that the command:
1971 # gsutil cp gs://bucket/obj dir
1972 # will turn into file://dir/ instead of file://dir -- the latter would cause
1973 # the file "dirobj" to be created.)
1974 # Note: need to check have_multiple_srcs or src_uri.names_container()
1975 # because src_uri could be a bucket containing a single object, named
1976 # as gs://bucket.
1977 if ((have_multiple_srcs or src_uri.names_container()
1978 or os.path.isdir(exp_dst_uri.object_name))
1979 and not exp_dst_uri.uri.endswith(exp_dst_uri.delim)):
1980 exp_dst_uri = exp_dst_uri.clone_replace_name(
1981 '%s%s' % (exp_dst_uri.object_name, exp_dst_uri.delim)
1982 )
1983
1984 # Making naming behavior match how things work with local Linux cp and mv
1985 # operations depends on many factors, including whether the destination is a
1986 # container, the plurality of the source(s), and whether the mv command is
1987 # being used:
1988 # 1. For the "mv" command that specifies a non-existent destination subdir,
1989 # renaming should occur at the level of the src subdir, vs appending that
1990 # subdir beneath the dst subdir like is done for copying. For example:
1991 # gsutil rm -R gs://bucket
1992 # gsutil cp -R dir1 gs://bucket
1993 # gsutil cp -R dir2 gs://bucket/subdir1
1994 # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2
1995 # would (if using cp naming behavior) end up with paths like:
1996 # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops
1997 # whereas mv naming behavior should result in:
1998 # gs://bucket/subdir2/dir2/.svn/all-wcprops
1999 # 2. Copying from directories, buckets, or bucket subdirs should result in
2000 # objects/files mirroring the source directory hierarchy. For example:
2001 # gsutil cp dir1/dir2 gs://bucket
2002 # should create the object gs://bucket/dir2/file2, assuming dir1/dir2
2003 # contains file2).
2004 # To be consistent with Linux cp behavior, there's one more wrinkle when
2005 # working with subdirs: The resulting object names depend on whether the
2006 # destination subdirectory exists. For example, if gs://bucket/subdir
2007 # exists, the command:
2008 # gsutil cp -R dir1/dir2 gs://bucket/subdir
2009 # should create objects named like gs://bucket/subdir/dir2/a/b/c. In
2010 # contrast, if gs://bucket/subdir does not exist, this same command
2011 # should create objects named like gs://bucket/subdir/a/b/c.
2012 # 3. Copying individual files or objects to dirs, buckets or bucket subdirs
2013 # should result in objects/files named by the final source file name
2014 # component. Example:
2015 # gsutil cp dir1/*.txt gs://bucket
2016 # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt,
2017 # assuming dir1 contains f1.txt and f2.txt.
2018
2019 if (self.perform_mv and self.recursion_requested
2020 and src_uri_expands_to_multi and not have_existing_dest_subdir):
2021 # Case 1. Handle naming rules for bucket subdir mv. Here we want to
2022 # line up the src_uri against its expansion, to find the base to build
2023 # the new name. For example, running the command:
2024 # gsutil mv gs://bucket/abcd gs://bucket/xyz
2025 # when processing exp_src_uri=gs://bucket/abcd/123
2026 # exp_src_uri_tail should become /123
2027 # Note: mv.py code disallows wildcard specification of source URI.
2028 exp_src_uri_tail = exp_src_uri.uri[len(src_uri.uri):]
2029 dst_key_name = '%s/%s' % (exp_dst_uri.object_name.rstrip('/'),
2030 exp_src_uri_tail.strip('/'))
2031 return exp_dst_uri.clone_replace_name(dst_key_name)
2032
2033 if src_uri_names_container and not exp_dst_uri.names_file():
2034 # Case 2. Build dst_key_name from subpath of exp_src_uri past
2035 # where src_uri ends. For example, for src_uri=gs://bucket/ and
2036 # exp_src_uri=gs://bucket/src_subdir/obj, dst_key_name should be
2037 # src_subdir/obj.
2038 src_uri_path_sans_final_dir = _GetPathBeforeFinalDir(src_uri)
2039 if exp_src_uri.is_cloud_uri():
2040 dst_key_name = exp_src_uri.versionless_uri[
2041 len(src_uri_path_sans_final_dir):].lstrip(src_uri.delim)
2042 else:
2043 dst_key_name = exp_src_uri.uri[
2044 len(src_uri_path_sans_final_dir):].lstrip(src_uri.delim)
2045 # Handle case where dst_uri is a non-existent subdir.
2046 if not have_existing_dest_subdir:
2047 dst_key_name = dst_key_name.partition(src_uri.delim)[-1]
2048 # Handle special case where src_uri was a directory named with '.' or
2049 # './', so that running a command like:
2050 # gsutil cp -r . gs://dest
2051 # will produce obj names of the form gs://dest/abc instead of
2052 # gs://dest/./abc.
2053 if dst_key_name.startswith('.%s' % os.sep):
2054 dst_key_name = dst_key_name[2:]
2055
2056 else:
2057 # Case 3.
2058 dst_key_name = exp_src_uri.object_name.rpartition(src_uri.delim)[-1]
2059
2060 if (exp_dst_uri.is_file_uri()
2061 or self._ShouldTreatDstUriAsBucketSubDir(
2062 have_multiple_srcs, exp_dst_uri, have_existing_dest_subdir)):
2063 if exp_dst_uri.object_name.endswith(exp_dst_uri.delim):
2064 dst_key_name = '%s%s%s' % (
2065 exp_dst_uri.object_name.rstrip(exp_dst_uri.delim),
2066 exp_dst_uri.delim, dst_key_name)
2067 else:
2068 delim = exp_dst_uri.delim if exp_dst_uri.object_name else ''
2069 dst_key_name = '%s%s%s' % (exp_dst_uri.object_name, delim, dst_key_name)
2070
2071 return exp_dst_uri.clone_replace_name(dst_key_name)
2072
2073 def _FixWindowsNaming(self, src_uri, dst_uri):
2074 """
2075 Rewrites the destination URI built by _ConstructDstUri() to translate
2076 Windows pathnames to cloud pathnames if needed.
2077
2078 Args:
2079 src_uri: Source URI to be copied.
2080 dst_uri: The destination URI built by _ConstructDstUri().
2081
2082 Returns:
2083 StorageUri to use for copy.
2084 """
2085 if (src_uri.is_file_uri() and src_uri.delim == '\\'
2086 and dst_uri.is_cloud_uri()):
2087 trans_uri_str = re.sub(r'\\', '/', dst_uri.uri)
2088 dst_uri = self.suri_builder.StorageUri(trans_uri_str)
2089 return dst_uri
2090
2091 def _CopyFunc(self, name_expansion_result):
2092 """Worker function for performing the actual copy (and rm, for mv).""" 607 """Worker function for performing the actual copy (and rm, for mv)."""
2093 (exp_dst_uri, have_existing_dst_container) = self._ExpandDstUri( 608 gsutil_api = GetCloudApiInstance(self, thread_state=thread_state)
2094 self.args[-1]) 609
2095 if self.perform_mv: 610 copy_helper_opts = copy_helper.GetCopyHelperOpts()
611 if copy_helper_opts.perform_mv:
2096 cmd_name = 'mv' 612 cmd_name = 'mv'
2097 else: 613 else:
2098 cmd_name = self.command_name 614 cmd_name = self.command_name
2099 src_uri = self.suri_builder.StorageUri( 615 src_url = name_expansion_result.source_storage_url
2100 name_expansion_result.GetSrcUriStr()) 616 exp_src_url = name_expansion_result.expanded_storage_url
2101 exp_src_uri = self.suri_builder.StorageUri( 617 src_url_names_container = name_expansion_result.names_container
2102 name_expansion_result.GetExpandedUriStr()) 618 have_multiple_srcs = name_expansion_result.is_multi_source_request
2103 src_uri_names_container = name_expansion_result.NamesContainer() 619
2104 src_uri_expands_to_multi = name_expansion_result.NamesContainer() 620 if src_url.IsCloudUrl() and src_url.IsProvider():
2105 have_multiple_srcs = name_expansion_result.IsMultiSrcRequest()
2106 have_existing_dest_subdir = (
2107 name_expansion_result.HaveExistingDstContainer())
2108 if src_uri.names_provider():
2109 raise CommandException( 621 raise CommandException(
2110 'The %s command does not allow provider-only source URIs (%s)' % 622 'The %s command does not allow provider-only source URLs (%s)' %
2111 (cmd_name, src_uri)) 623 (cmd_name, src_url))
2112 if have_multiple_srcs: 624 if have_multiple_srcs:
2113 self._InsistDstUriNamesContainer(exp_dst_uri, 625 copy_helper.InsistDstUrlNamesContainer(
2114 have_existing_dst_container, 626 self.exp_dst_url, self.have_existing_dst_container, cmd_name)
2115 cmd_name) 627
2116 628 # Various GUI tools (like the GCS web console) create placeholder objects
2117 629 # ending with '/' when the user creates an empty directory. Normally these
2118 if self.use_manifest and self.manifest.WasSuccessful(str(exp_src_uri)): 630 # tools should delete those placeholders once objects have been written
631 # "under" the directory, but sometimes the placeholders are left around. We
632 # need to filter them out here, otherwise if the user tries to rsync from
633 # GCS to a local directory it will result in a directory/file conflict
634 # (e.g., trying to download an object called "mydata/" where the local
635 # directory "mydata" exists).
636 if IsCloudSubdirPlaceholder(exp_src_url):
637 self.logger.info('Skipping cloud sub-directory placeholder object %s',
638 exp_src_url)
2119 return 639 return
2120 640
2121 if self.perform_mv: 641 if copy_helper_opts.use_manifest and self.manifest.WasSuccessful(
2122 if name_expansion_result.NamesContainer(): 642 exp_src_url.url_string):
643 return
644
645 if copy_helper_opts.perform_mv:
646 if name_expansion_result.names_container:
2123 # Use recursion_requested when performing name expansion for the 647 # Use recursion_requested when performing name expansion for the
2124 # directory mv case so we can determine if any of the source URIs are 648 # directory mv case so we can determine if any of the source URLs are
2125 # directories (and then use cp -R and rm -R to perform the move, to 649 # directories (and then use cp -R and rm -R to perform the move, to
2126 # match the behavior of Linux mv (which when moving a directory moves 650 # match the behavior of Linux mv (which when moving a directory moves
2127 # all the contained files). 651 # all the contained files).
2128 self.recursion_requested = True 652 self.recursion_requested = True
2129 # Disallow wildcard src URIs when moving directories, as supporting it 653 # Disallow wildcard src URLs when moving directories, as supporting it
2130 # would make the name transformation too complex and would also be 654 # would make the name transformation too complex and would also be
2131 # dangerous (e.g., someone could accidentally move many objects to the 655 # dangerous (e.g., someone could accidentally move many objects to the
2132 # wrong name, or accidentally overwrite many objects). 656 # wrong name, or accidentally overwrite many objects).
2133 if ContainsWildcard(src_uri): 657 if ContainsWildcard(src_url.url_string):
2134 raise CommandException('The mv command disallows naming source ' 658 raise CommandException('The mv command disallows naming source '
2135 'directories using wildcards') 659 'directories using wildcards')
2136 660
2137 if (exp_dst_uri.is_file_uri() 661 if (self.exp_dst_url.IsFileUrl()
2138 and not os.path.exists(exp_dst_uri.object_name) 662 and not os.path.exists(self.exp_dst_url.object_name)
2139 and have_multiple_srcs): 663 and have_multiple_srcs):
2140 os.makedirs(exp_dst_uri.object_name) 664 os.makedirs(self.exp_dst_url.object_name)
2141 665
2142 dst_uri = self._ConstructDstUri(src_uri, exp_src_uri, 666 dst_url = copy_helper.ConstructDstUrl(
2143 src_uri_names_container, 667 src_url, exp_src_url, src_url_names_container, have_multiple_srcs,
2144 src_uri_expands_to_multi, 668 self.exp_dst_url, self.have_existing_dst_container,
2145 have_multiple_srcs, exp_dst_uri, 669 self.recursion_requested)
2146 have_existing_dest_subdir) 670 dst_url = copy_helper.FixWindowsNaming(src_url, dst_url)
2147 dst_uri = self._FixWindowsNaming(src_uri, dst_uri) 671
2148 672 copy_helper.CheckForDirFileConflict(exp_src_url, dst_url)
2149 self._CheckForDirFileConflict(exp_src_uri, dst_uri) 673 if copy_helper.SrcDstSame(exp_src_url, dst_url):
2150 if self._SrcDstSame(exp_src_uri, dst_uri):
2151 raise CommandException('%s: "%s" and "%s" are the same file - ' 674 raise CommandException('%s: "%s" and "%s" are the same file - '
2152 'abort.' % (cmd_name, exp_src_uri, dst_uri)) 675 'abort.' % (cmd_name, exp_src_url, dst_url))
2153 676
2154 if dst_uri.is_cloud_uri() and dst_uri.is_version_specific: 677 if dst_url.IsCloudUrl() and dst_url.HasGeneration():
2155 raise CommandException('%s: a version-specific URI\n(%s)\ncannot be ' 678 raise CommandException('%s: a version-specific URL\n(%s)\ncannot be '
2156 'the destination for gsutil cp - abort.' 679 'the destination for gsutil cp - abort.'
2157 % (cmd_name, dst_uri)) 680 % (cmd_name, dst_url))
2158 681
2159 elapsed_time = bytes_transferred = 0 682 elapsed_time = bytes_transferred = 0
2160 try: 683 try:
2161 if self.use_manifest: 684 if copy_helper_opts.use_manifest:
2162 self.manifest.Initialize(exp_src_uri, dst_uri) 685 self.manifest.Initialize(
2163 (elapsed_time, bytes_transferred, result_uri) = ( 686 exp_src_url.url_string, dst_url.url_string)
2164 self._PerformCopy(exp_src_uri, dst_uri)) 687 (elapsed_time, bytes_transferred, result_url, md5) = (
2165 if self.use_manifest: 688 copy_helper.PerformCopy(
2166 if hasattr(dst_uri, 'md5'): 689 self.logger, exp_src_url, dst_url, gsutil_api,
2167 self.manifest.Set(exp_src_uri, 'md5', dst_uri.md5) 690 self, _CopyExceptionHandler, allow_splitting=True,
2168 self.manifest.SetResult(exp_src_uri, bytes_transferred, 'OK') 691 headers=self.headers, manifest=self.manifest,
692 gzip_exts=self.gzip_exts, test_method=self.test_method))
693 if copy_helper_opts.use_manifest:
694 if md5:
695 self.manifest.Set(exp_src_url.url_string, 'md5', md5)
696 self.manifest.SetResult(
697 exp_src_url.url_string, bytes_transferred, 'OK')
698 if copy_helper_opts.print_ver:
699 # Some cases don't return a version-specific URL (e.g., if destination
700 # is a file).
701 self.logger.info('Created: %s', result_url)
2169 except ItemExistsError: 702 except ItemExistsError:
2170 message = 'Skipping existing item: %s' % dst_uri.uri 703 message = 'Skipping existing item: %s' % dst_url
2171 self.logger.info(message) 704 self.logger.info(message)
2172 if self.use_manifest: 705 if copy_helper_opts.use_manifest:
2173 self.manifest.SetResult(exp_src_uri, 0, 'skip', message) 706 self.manifest.SetResult(exp_src_url.url_string, 0, 'skip', message)
2174 except Exception, e: 707 except Exception, e:
2175 if self._IsNoClobberServerException(e): 708 if (copy_helper_opts.no_clobber and
2176 message = 'Rejected (noclobber): %s' % dst_uri.uri 709 copy_helper.IsNoClobberServerException(e)):
710 message = 'Rejected (noclobber): %s' % dst_url
2177 self.logger.info(message) 711 self.logger.info(message)
2178 if self.use_manifest: 712 if copy_helper_opts.use_manifest:
2179 self.manifest.SetResult(exp_src_uri, 0, 'skip', message) 713 self.manifest.SetResult(
714 exp_src_url.url_string, 0, 'skip', message)
2180 elif self.continue_on_error: 715 elif self.continue_on_error:
2181 message = 'Error copying %s: %s' % (src_uri.uri, str(e)) 716 message = 'Error copying %s: %s' % (src_url, str(e))
2182 self.copy_failure_count += 1 717 self.op_failure_count += 1
2183 self.logger.error(message) 718 self.logger.error(message)
2184 if self.use_manifest: 719 if copy_helper_opts.use_manifest:
2185 self.manifest.SetResult(exp_src_uri, 0, 'error', message) 720 self.manifest.SetResult(
721 exp_src_url.url_string, 0, 'error',
722 RemoveCRLFFromString(message))
2186 else: 723 else:
2187 if self.use_manifest: 724 if copy_helper_opts.use_manifest:
2188 self.manifest.SetResult(exp_src_uri, 0, 'error', str(e)) 725 self.manifest.SetResult(
726 exp_src_url.url_string, 0, 'error', str(e))
2189 raise 727 raise
2190 728
2191 if self.print_ver:
2192 # Some cases don't return a version-specific URI (e.g., if destination
2193 # is a file).
2194 if hasattr(result_uri, 'version_specific_uri'):
2195 self.logger.info('Created: %s' % result_uri.version_specific_uri)
2196 else:
2197 self.logger.info('Created: %s' % result_uri.uri)
2198
2199 # TODO: If we ever use -n (noclobber) with -M (move) (not possible today 729 # TODO: If we ever use -n (noclobber) with -M (move) (not possible today
2200 # since we call copy internally from move and don't specify the -n flag) 730 # since we call copy internally from move and don't specify the -n flag)
2201 # we'll need to only remove the source when we have not skipped the 731 # we'll need to only remove the source when we have not skipped the
2202 # destination. 732 # destination.
2203 if self.perform_mv: 733 if copy_helper_opts.perform_mv:
2204 self.logger.info('Removing %s...', exp_src_uri) 734 self.logger.info('Removing %s...', exp_src_url)
2205 exp_src_uri.delete_key(validate=False, headers=self.headers) 735 if exp_src_url.IsCloudUrl():
736 gsutil_api.DeleteObject(exp_src_url.bucket_name,
737 exp_src_url.object_name,
738 generation=exp_src_url.generation,
739 provider=exp_src_url.scheme)
740 else:
741 os.unlink(exp_src_url.object_name)
742
2206 with self.stats_lock: 743 with self.stats_lock:
2207 self.total_elapsed_time += elapsed_time 744 self.total_elapsed_time += elapsed_time
2208 self.total_bytes_transferred += bytes_transferred 745 self.total_bytes_transferred += bytes_transferred
2209 746
2210 # Command entry point. 747 # Command entry point.
2211 def RunCommand(self): 748 def RunCommand(self):
2212 self._ParseArgs() 749 copy_helper_opts = self._ParseOpts()
2213 750
2214 self.total_elapsed_time = self.total_bytes_transferred = 0 751 self.total_elapsed_time = self.total_bytes_transferred = 0
2215 if self.args[-1] == '-' or self.args[-1] == 'file://-': 752 if self.args[-1] == '-' or self.args[-1] == 'file://-':
2216 self._HandleStreamingDownload() 753 return CatHelper(self).CatUrlStrings(self.args[:-1])
2217 return 0 754
2218 755 if copy_helper_opts.read_args_from_stdin:
2219 if self.read_args_from_stdin:
2220 if len(self.args) != 1: 756 if len(self.args) != 1:
2221 raise CommandException('Source URIs cannot be specified with -I option') 757 raise CommandException('Source URLs cannot be specified with -I option')
2222 uri_strs = self._StdinIterator() 758 url_strs = copy_helper.StdinIterator()
2223 else: 759 else:
2224 if len(self.args) < 2: 760 if len(self.args) < 2:
2225 raise CommandException('Wrong number of arguments for "cp" command.') 761 raise CommandException('Wrong number of arguments for "cp" command.')
2226 uri_strs = self.args[:-1] 762 url_strs = self.args[:-1]
2227 763
2228 (exp_dst_uri, have_existing_dst_container) = self._ExpandDstUri( 764 (self.exp_dst_url, self.have_existing_dst_container) = (
2229 self.args[-1]) 765 copy_helper.ExpandUrlToSingleBlr(self.args[-1], self.gsutil_api,
766 self.debug, self.project_id))
767
2230 # If the destination bucket has versioning enabled iterate with 768 # If the destination bucket has versioning enabled iterate with
2231 # all_versions=True. That way we'll copy all versions if the source bucket 769 # all_versions=True. That way we'll copy all versions if the source bucket
2232 # is versioned; and by leaving all_versions=False if the destination bucket 770 # is versioned; and by leaving all_versions=False if the destination bucket
2233 # has versioning disabled we will avoid copying old versions all to the same 771 # has versioning disabled we will avoid copying old versions all to the same
2234 # un-versioned destination object. 772 # un-versioned destination object.
773 all_versions = False
2235 try: 774 try:
2236 all_versions = (exp_dst_uri.names_bucket() 775 bucket = self._GetBucketWithVersioningConfig(self.exp_dst_url)
2237 and exp_dst_uri.get_versioning_config(self.headers)) 776 if bucket and bucket.versioning and bucket.versioning.enabled:
2238 except GSResponseError as e:
2239 # This happens if the user doesn't have OWNER access on the bucket (needed
2240 # to check if versioning is enabled). In this case fall back to copying
2241 # all versions (which can be inefficient for the reason noted in the
2242 # comment above). We don't try to warn the user because that would result
2243 # in false positive warnings (since we can't check if versioning is
2244 # enabled on the destination bucket).
2245 if e.status == 403:
2246 all_versions = True 777 all_versions = True
2247 else: 778 except AccessDeniedException:
2248 raise 779 # This happens (in the XML API only) if the user doesn't have OWNER access
780 # on the bucket (needed to check if versioning is enabled). In this case
781 # fall back to copying all versions (which can be inefficient for the
782 # reason noted in the comment above). We don't try to warn the user
783 # because that would result in false positive warnings (since we can't
784 # check if versioning is enabled on the destination bucket).
785 #
786 # For JSON, we will silently not return versioning if we don't have
787 # access.
788 all_versions = True
789
2249 name_expansion_iterator = NameExpansionIterator( 790 name_expansion_iterator = NameExpansionIterator(
2250 self.command_name, self.proj_id_handler, self.headers, self.debug, 791 self.command_name, self.debug,
2251 self.logger, self.bucket_storage_uri_class, uri_strs, 792 self.logger, self.gsutil_api, url_strs,
2252 self.recursion_requested or self.perform_mv, 793 self.recursion_requested or copy_helper_opts.perform_mv,
2253 have_existing_dst_container=have_existing_dst_container, 794 project_id=self.project_id, all_versions=all_versions,
2254 all_versions=all_versions) 795 continue_on_error=self.continue_on_error or self.parallel_operations)
2255 self.have_existing_dst_container = have_existing_dst_container
2256 796
2257 # Use a lock to ensure accurate statistics in the face of 797 # Use a lock to ensure accurate statistics in the face of
2258 # multi-threading/multi-processing. 798 # multi-threading/multi-processing.
2259 self.stats_lock = CreateLock() 799 self.stats_lock = CreateLock()
2260 800
2261 # Tracks if any copies failed. 801 # Tracks if any copies failed.
2262 self.copy_failure_count = 0 802 self.op_failure_count = 0
2263 803
2264 # Start the clock. 804 # Start the clock.
2265 start_time = time.time() 805 start_time = time.time()
2266 806
2267 # Tuple of attributes to share/manage across multiple processes in 807 # Tuple of attributes to share/manage across multiple processes in
2268 # parallel (-m) mode. 808 # parallel (-m) mode.
2269 shared_attrs = ('copy_failure_count', 'total_bytes_transferred') 809 shared_attrs = ('op_failure_count', 'total_bytes_transferred')
2270 810
2271 # Perform copy requests in parallel (-m) mode, if requested, using 811 # Perform copy requests in parallel (-m) mode, if requested, using
2272 # configured number of parallel processes and threads. Otherwise, 812 # configured number of parallel processes and threads. Otherwise,
2273 # perform requests with sequential function calls in current process. 813 # perform requests with sequential function calls in current process.
2274 self.Apply(_CopyFuncWrapper, name_expansion_iterator, 814 self.Apply(_CopyFuncWrapper, name_expansion_iterator,
2275 _CopyExceptionHandler, shared_attrs, fail_on_error=True) 815 _CopyExceptionHandler, shared_attrs,
816 fail_on_error=(not self.continue_on_error))
2276 self.logger.debug( 817 self.logger.debug(
2277 'total_bytes_transferred: %d', self.total_bytes_transferred) 818 'total_bytes_transferred: %d', self.total_bytes_transferred)
2278 819
2279 end_time = time.time() 820 end_time = time.time()
2280 self.total_elapsed_time = end_time - start_time 821 self.total_elapsed_time = end_time - start_time
2281 822
2282 # Sometimes, particularly when running unit tests, the total elapsed time 823 # Sometimes, particularly when running unit tests, the total elapsed time
2283 # is really small. On Windows, the timer resolution is too small and 824 # is really small. On Windows, the timer resolution is too small and
2284 # causes total_elapsed_time to be zero. 825 # causes total_elapsed_time to be zero.
2285 try: 826 try:
2286 float(self.total_bytes_transferred) / float(self.total_elapsed_time) 827 float(self.total_bytes_transferred) / float(self.total_elapsed_time)
2287 except ZeroDivisionError: 828 except ZeroDivisionError:
2288 self.total_elapsed_time = 0.01 829 self.total_elapsed_time = 0.01
2289 830
2290 self.total_bytes_per_second = (float(self.total_bytes_transferred) / 831 self.total_bytes_per_second = (float(self.total_bytes_transferred) /
2291 float(self.total_elapsed_time)) 832 float(self.total_elapsed_time))
2292 833
2293 if self.debug == 3: 834 if self.debug == 3:
2294 # Note that this only counts the actual GET and PUT bytes for the copy 835 # Note that this only counts the actual GET and PUT bytes for the copy
2295 # - not any transfers for doing wildcard expansion, the initial HEAD 836 # - not any transfers for doing wildcard expansion, the initial
2296 # request boto performs when doing a bucket.get_key() operation, etc. 837 # HEAD/GET request performed to get the object metadata, etc.
2297 if self.total_bytes_transferred != 0: 838 if self.total_bytes_transferred != 0:
2298 self.logger.info( 839 self.logger.info(
2299 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)', 840 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)',
2300 self.total_bytes_transferred, self.total_elapsed_time, 841 self.total_bytes_transferred, self.total_elapsed_time,
2301 MakeHumanReadable(self.total_bytes_per_second)) 842 MakeHumanReadable(self.total_bytes_per_second))
2302 if self.copy_failure_count: 843 if self.op_failure_count:
2303 plural_str = '' 844 plural_str = 's' if self.op_failure_count else ''
2304 if self.copy_failure_count > 1:
2305 plural_str = 's'
2306 raise CommandException('%d file%s/object%s could not be transferred.' % ( 845 raise CommandException('%d file%s/object%s could not be transferred.' % (
2307 self.copy_failure_count, plural_str, plural_str)) 846 self.op_failure_count, plural_str, plural_str))
2308 847
2309 return 0 848 return 0
2310 849
2311 def _ParseArgs(self): 850 def _ParseOpts(self):
2312 self.perform_mv = False 851 perform_mv = False
852 # exclude_symlinks is handled by Command parent class, so save in Command
853 # state rather than CopyHelperOpts.
2313 self.exclude_symlinks = False 854 self.exclude_symlinks = False
2314 self.no_clobber = False 855 no_clobber = False
856 # continue_on_error is handled by Command parent class, so save in Command
857 # state rather than CopyHelperOpts.
2315 self.continue_on_error = False 858 self.continue_on_error = False
2316 self.daisy_chain = False 859 daisy_chain = False
2317 self.read_args_from_stdin = False 860 read_args_from_stdin = False
2318 self.print_ver = False 861 print_ver = False
2319 self.use_manifest = False 862 use_manifest = False
863 preserve_acl = False
864 canned_acl = None
865 # canned_acl is handled by a helper function in parent
866 # Command class, so save in Command state rather than CopyHelperOpts.
867 self.canned = None
868
869 # Files matching these extensions should be gzipped before uploading.
870 self.gzip_exts = []
871
872 # Test hook for stopping transfers.
873 halt_at_byte = None
2320 874
2321 # self.recursion_requested initialized in command.py (so can be checked 875 # self.recursion_requested initialized in command.py (so can be checked
2322 # in parent class for all commands). 876 # in parent class for all commands).
877 self.manifest = None
2323 if self.sub_opts: 878 if self.sub_opts:
2324 for o, a in self.sub_opts: 879 for o, a in self.sub_opts:
880 if o == '-a':
881 canned_acl = a
882 self.canned = True
2325 if o == '-c': 883 if o == '-c':
2326 self.continue_on_error = True 884 self.continue_on_error = True
2327 elif o == '-D': 885 elif o == '-D':
2328 self.daisy_chain = True 886 daisy_chain = True
2329 elif o == '-e': 887 elif o == '-e':
2330 self.exclude_symlinks = True 888 self.exclude_symlinks = True
889 elif o == '--haltatbyte':
890 halt_at_byte = long(a)
2331 elif o == '-I': 891 elif o == '-I':
2332 self.read_args_from_stdin = True 892 read_args_from_stdin = True
2333 elif o == '-L': 893 elif o == '-L':
2334 self.use_manifest = True 894 use_manifest = True
2335 self.manifest = _Manifest(a) 895 self.manifest = Manifest(a)
2336 elif o == '-M': 896 elif o == '-M':
2337 # Note that we signal to the cp command to perform a move (copy 897 # Note that we signal to the cp command to perform a move (copy
2338 # followed by remove) and use directory-move naming rules by passing 898 # followed by remove) and use directory-move naming rules by passing
2339 # the undocumented (for internal use) -M option when running the cp 899 # the undocumented (for internal use) -M option when running the cp
2340 # command from mv.py. 900 # command from mv.py.
2341 self.perform_mv = True 901 perform_mv = True
2342 elif o == '-n': 902 elif o == '-n':
2343 self.no_clobber = True 903 no_clobber = True
2344 elif o == '-q': 904 elif o == '-p':
2345 self.logger.warning( 905 preserve_acl = True
2346 'Warning: gsutil cp -q is deprecated, and will be removed in the '
2347 'future.\nPlease use gsutil -q cp ... instead.')
2348 self.logger.setLevel(level=logging.WARNING)
2349 elif o == '-r' or o == '-R': 906 elif o == '-r' or o == '-R':
2350 self.recursion_requested = True 907 self.recursion_requested = True
2351 elif o == '-v': 908 elif o == '-v':
2352 self.print_ver = True 909 print_ver = True
910 elif o == '-z':
911 self.gzip_exts = [x.strip() for x in a.split(',')]
912 if preserve_acl and canned_acl:
913 raise CommandException(
914 'Specifying both the -p and -a options together is invalid.')
915 return CreateCopyHelperOpts(
916 perform_mv=perform_mv,
917 no_clobber=no_clobber,
918 daisy_chain=daisy_chain,
919 read_args_from_stdin=read_args_from_stdin,
920 print_ver=print_ver,
921 use_manifest=use_manifest,
922 preserve_acl=preserve_acl,
923 canned_acl=canned_acl,
924 halt_at_byte=halt_at_byte)
2353 925
2354 def _HandleStreamingDownload(self): 926 def _GetBucketWithVersioningConfig(self, exp_dst_url):
2355 # Destination is <STDOUT>. Manipulate sys.stdout so as to redirect all 927 """Gets versioning config for a bucket and ensures that it exists.
2356 # debug messages to <STDERR>.
2357 stdout_fp = sys.stdout
2358 sys.stdout = sys.stderr
2359 did_some_work = False
2360 for uri_str in self.args[0:len(self.args)-1]:
2361 for uri in self.WildcardIterator(uri_str).IterUris():
2362 did_some_work = True
2363 key = uri.get_key(False, self.headers)
2364 (elapsed_time, bytes_transferred) = self._PerformDownloadToStream(
2365 key, uri, stdout_fp, self.headers)
2366 self.total_elapsed_time += elapsed_time
2367 self.total_bytes_transferred += bytes_transferred
2368 if not did_some_work:
2369 raise CommandException('No URIs matched')
2370 if self.debug == 3:
2371 if self.total_bytes_transferred != 0:
2372 self.logger.info(
2373 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)',
2374 self.total_bytes_transferred, self.total_elapsed_time,
2375 MakeHumanReadable(float(self.total_bytes_transferred) /
2376 float(self.total_elapsed_time)))
2377
2378 def _StdinIterator(self):
2379 """A generator function that returns lines from stdin."""
2380 for line in sys.stdin:
2381 # Strip CRLF.
2382 yield line.rstrip()
2383
2384 def _SrcDstSame(self, src_uri, dst_uri):
2385 """Checks if src_uri and dst_uri represent the same object or file.
2386
2387 We don't handle anything about hard or symbolic links.
2388 928
2389 Args: 929 Args:
2390 src_uri: Source StorageUri. 930 exp_dst_url: Wildcard-expanded destination StorageUrl.
2391 dst_uri: Destination StorageUri. 931
932 Raises:
933 AccessDeniedException: if there was a permissions problem accessing the
934 bucket or its versioning config.
935 CommandException: if URL refers to a cloud bucket that does not exist.
2392 936
2393 Returns: 937 Returns:
2394 Bool indicator. 938 apitools Bucket with versioning configuration.
2395 """ 939 """
2396 if src_uri.is_file_uri() and dst_uri.is_file_uri(): 940 bucket = None
2397 # Translate a/b/./c to a/b/c, so src=dst comparison below works. 941 if exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket():
2398 new_src_path = os.path.normpath(src_uri.object_name) 942 try:
2399 new_dst_path = os.path.normpath(dst_uri.object_name) 943 bucket = self.gsutil_api.GetBucket(
2400 return (src_uri.clone_replace_name(new_src_path).uri == 944 exp_dst_url.bucket_name, provider=exp_dst_url.scheme,
2401 dst_uri.clone_replace_name(new_dst_path).uri) 945 fields=['versioning'])
2402 else: 946 except AccessDeniedException, e:
2403 return (src_uri.uri == dst_uri.uri and 947 raise
2404 src_uri.generation == dst_uri.generation and 948 except NotFoundException, e:
2405 src_uri.version_id == dst_uri.version_id) 949 raise CommandException('Destination bucket %s does not exist.' %
2406 950 exp_dst_url)
2407 def _ShouldTreatDstUriAsBucketSubDir(self, have_multiple_srcs, dst_uri, 951 except Exception, e:
2408 have_existing_dest_subdir): 952 raise CommandException('Error retrieving destination bucket %s: %s' %
2409 """ 953 (exp_dst_url, e.message))
2410 Checks whether dst_uri should be treated as a bucket "sub-directory". The 954 return bucket
2411 decision about whether something constitutes a bucket "sub-directory"
2412 depends on whether there are multiple sources in this request and whether
2413 there is an existing bucket subdirectory. For example, when running the
2414 command:
2415 gsutil cp file gs://bucket/abc
2416 if there's no existing gs://bucket/abc bucket subdirectory we should copy
2417 file to the object gs://bucket/abc. In contrast, if
2418 there's an existing gs://bucket/abc bucket subdirectory we should copy
2419 file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc
2420 exists, when running the command:
2421 gsutil cp file1 file2 gs://bucket/abc
2422 we should copy file1 to gs://bucket/abc/file1 (and similarly for file2).
2423
2424 Note that we don't disallow naming a bucket "sub-directory" where there's
2425 already an object at that URI. For example it's legitimate (albeit
2426 confusing) to have an object called gs://bucket/dir and
2427 then run the command
2428 gsutil cp file1 file2 gs://bucket/dir
2429 Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1,
2430 and gs://bucket/dir/file2.
2431
2432 Args:
2433 have_multiple_srcs: Bool indicator of whether this is a multi-source
2434 operation.
2435 dst_uri: StorageUri to check.
2436 have_existing_dest_subdir: bool indicator whether dest is an existing
2437 subdirectory.
2438
2439 Returns:
2440 bool indicator.
2441 """
2442 return ((have_multiple_srcs and dst_uri.is_cloud_uri())
2443 or (have_existing_dest_subdir))
2444
2445 def _ShouldTreatDstUriAsSingleton(self, have_multiple_srcs,
2446 have_existing_dest_subdir, dst_uri):
2447 """
2448 Checks that dst_uri names a singleton (file or object) after
2449 dir/wildcard expansion. The decision is more nuanced than simply
2450 dst_uri.names_singleton()) because of the possibility that an object path
2451 might name a bucket sub-directory.
2452
2453 Args:
2454 have_multiple_srcs: Bool indicator of whether this is a multi-source
2455 operation.
2456 have_existing_dest_subdir: bool indicator whether dest is an existing
2457 subdirectory.
2458 dst_uri: StorageUri to check.
2459
2460 Returns:
2461 bool indicator.
2462 """
2463 if have_multiple_srcs:
2464 # Only a file meets the criteria in this case.
2465 return dst_uri.names_file()
2466 return not have_existing_dest_subdir and dst_uri.names_singleton()
2467
2468 def _IsNoClobberServerException(self, e):
2469 """
2470 Checks to see if the server attempted to clobber a file after we specified
2471 in the header that we didn't want the file clobbered.
2472
2473 Args:
2474 e: The Exception that was generated by a failed copy operation
2475
2476 Returns:
2477 bool indicator - True indicates that the server did attempt to clobber
2478 an existing file.
2479 """
2480 return self.no_clobber and (
2481 (isinstance(e, GSResponseError) and e.status==412) or
2482 (isinstance(e, ResumableUploadException) and 'code 412' in e.message))
2483
2484
2485 class _Manifest(object):
2486 """Stores the manifest items for the CpCommand class."""
2487
2488 def __init__(self, path):
2489 # self.items contains a dictionary of rows
2490 self.items = {}
2491 self.manifest_filter = {}
2492 self.lock = multiprocessing.Manager().Lock()
2493
2494 self.manifest_path = os.path.expanduser(path)
2495 self._ParseManifest()
2496 self._CreateManifestFile()
2497
2498 def _ParseManifest(self):
2499 """
2500 Load and parse a manifest file. This information will be used to skip
2501 any files that have a skip or OK status.
2502 """
2503 try:
2504 if os.path.exists(self.manifest_path):
2505 with open(self.manifest_path, 'rb') as f:
2506 first_row = True
2507 reader = csv.reader(f)
2508 for row in reader:
2509 if first_row:
2510 try:
2511 source_index = row.index('Source')
2512 result_index = row.index('Result')
2513 except ValueError:
2514 # No header and thus not a valid manifest file.
2515 raise CommandException(
2516 'Missing headers in manifest file: %s' % self.manifest_path)
2517 first_row = False
2518 source = row[source_index]
2519 result = row[result_index]
2520 if result in ['OK', 'skip']:
2521 # We're always guaranteed to take the last result of a specific
2522 # source uri.
2523 self.manifest_filter[source] = result
2524 except IOError as ex:
2525 raise CommandException('Could not parse %s' % path)
2526
2527 def WasSuccessful(self, src):
2528 """ Returns whether the specified src uri was marked as successful."""
2529 return src in self.manifest_filter
2530
2531 def _CreateManifestFile(self):
2532 """Opens the manifest file and assigns it to the file pointer."""
2533 try:
2534 if ((not os.path.exists(self.manifest_path))
2535 or (os.stat(self.manifest_path).st_size == 0)):
2536 # Add headers to the new file.
2537 with open(self.manifest_path, 'wb', 1) as f:
2538 writer = csv.writer(f)
2539 writer.writerow(['Source',
2540 'Destination',
2541 'Start',
2542 'End',
2543 'Md5',
2544 'UploadId',
2545 'Source Size',
2546 'Bytes Transferred',
2547 'Result',
2548 'Description'])
2549 except IOError:
2550 raise CommandException('Could not create manifest file.')
2551
2552 def Set(self, uri, key, value):
2553 if value is None:
2554 # In case we don't have any information to set we bail out here.
2555 # This is so that we don't clobber existing information.
2556 # To zero information pass '' instead of None.
2557 return
2558 if uri in self.items:
2559 self.items[uri][key] = value
2560 else:
2561 self.items[uri] = {key:value}
2562
2563 def Initialize(self, source_uri, destination_uri):
2564 # Always use the source_uri as the key for the item. This is unique.
2565 self.Set(source_uri, 'source_uri', source_uri)
2566 self.Set(source_uri, 'destination_uri', destination_uri)
2567 self.Set(source_uri, 'start_time', datetime.datetime.utcnow())
2568
2569 def SetResult(self, source_uri, bytes_transferred, result,
2570 description=''):
2571 self.Set(source_uri, 'bytes', bytes_transferred)
2572 self.Set(source_uri, 'result', result)
2573 self.Set(source_uri, 'description', description)
2574 self.Set(source_uri, 'end_time', datetime.datetime.utcnow())
2575 self._WriteRowToManifestFile(source_uri)
2576 self._RemoveItemFromManifest(source_uri)
2577
2578 def _WriteRowToManifestFile(self, uri):
2579 row_item = self.items[uri]
2580 data = [
2581 str(row_item['source_uri']),
2582 str(row_item['destination_uri']),
2583 '%sZ' % row_item['start_time'].isoformat(),
2584 '%sZ' % row_item['end_time'].isoformat(),
2585 row_item['md5'] if 'md5' in row_item else '',
2586 row_item['upload_id'] if 'upload_id' in row_item else '',
2587 str(row_item['size']) if 'size' in row_item else '',
2588 str(row_item['bytes']) if 'bytes' in row_item else '',
2589 row_item['result'],
2590 row_item['description']]
2591
2592 # Aquire a lock to prevent multiple threads writing to the same file at
2593 # the same time. This would cause a garbled mess in the manifest file.
2594 with self.lock:
2595 with open(self.manifest_path, 'a', 1) as f: # 1 == line buffered
2596 writer = csv.writer(f)
2597 writer.writerow(data)
2598
2599 def _RemoveItemFromManifest(self, uri):
2600 # Remove the item from the dictionary since we're done with it and
2601 # we don't want the dictionary to grow too large in memory for no good
2602 # reason.
2603 del self.items[uri]
2604
2605
2606 class ItemExistsError(Exception):
2607 """Exception class for objects that are skipped because they already exist."""
2608 pass
2609
2610
2611 def _GetPathBeforeFinalDir(uri):
2612 """
2613 Returns the part of the path before the final directory component for the
2614 given URI, handling cases for file system directories, bucket, and bucket
2615 subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket',
2616 and for file://dir we'll return file://
2617
2618 Args:
2619 uri: StorageUri.
2620
2621 Returns:
2622 String name of above-described path, sans final path separator.
2623 """
2624 sep = uri.delim
2625 # If the source uri argument had a wildcard and wasn't expanded by the
2626 # shell, then uri.names_file() will always be true, so we check for
2627 # this case explicitly.
2628 assert ((not uri.names_file()) or ContainsWildcard(uri.object_name))
2629 if uri.names_directory():
2630 past_scheme = uri.uri[len('file://'):]
2631 if past_scheme.find(sep) == -1:
2632 return 'file://'
2633 else:
2634 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0]
2635 if uri.names_bucket():
2636 return '%s://' % uri.scheme
2637 # Else it names a bucket subdir.
2638 return uri.uri.rstrip(sep).rpartition(sep)[0]
2639
2640 def _HashFilename(filename):
2641 """
2642 Apply a hash function (SHA1) to shorten the passed file name. The spec
2643 for the hashed file name is as follows:
2644
2645 TRACKER_<hash>_<trailing>
2646
2647 where hash is a SHA1 hash on the original file name and trailing is
2648 the last 16 chars from the original file name. Max file name lengths
2649 vary by operating system so the goal of this function is to ensure
2650 the hashed version takes fewer than 100 characters.
2651
2652 Args:
2653 filename: file name to be hashed.
2654
2655 Returns:
2656 shorter, hashed version of passed file name
2657 """
2658 if isinstance(filename, unicode):
2659 filename = filename.encode('utf-8')
2660 else:
2661 filename = unicode(filename, 'utf8').encode('utf-8')
2662 m = hashlib.sha1(filename)
2663 return "TRACKER_" + m.hexdigest() + '.' + filename[-16:]
2664
2665 def _DivideAndCeil(dividend, divisor):
2666 """Returns ceil(dividend / divisor), taking care to avoid the pitfalls of
2667 floating point arithmetic that could otherwise yield the wrong result
2668 for large numbers.
2669 """
2670 quotient = dividend // divisor
2671 if (dividend % divisor) != 0:
2672 quotient += 1
2673 return quotient
2674
2675 def _GetPartitionInfo(file_size, max_components, default_component_size):
2676 """
2677 Args:
2678 file_size: The number of bytes in the file to be partitioned.
2679 max_components: The maximum number of components that can be composed.
2680 default_component_size: The size of a component, assuming that
2681 max_components is infinite.
2682 Returns:
2683 The number of components in the partitioned file, and the size of each
2684 component (except the last, which will have a different size iff
2685 file_size != 0 (mod num_components)).
2686 """
2687 # num_components = ceil(file_size / default_component_size)
2688 num_components = _DivideAndCeil(file_size, default_component_size)
2689
2690 # num_components must be in the range [2, max_components]
2691 num_components = max(min(num_components, max_components), 2)
2692
2693 # component_size = ceil(file_size / num_components)
2694 component_size = _DivideAndCeil(file_size, num_components)
2695 return (num_components, component_size)
2696
2697 def _DeleteKeyFn(cls, key):
2698 """Wrapper function to be used with command.Apply()."""
2699 return key.delete_key()
2700
2701 def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock):
2702 """Parse the tracker file (if any) from the last parallel composite upload
2703 attempt. The tracker file is of the format described in
2704 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be
2705 read, then the upload will start from the beginning.
2706
2707 Args:
2708 tracker_file: The name of the file to parse.
2709 tracker_file_lock: Lock protecting access to the tracker file.
2710
2711 Returns:
2712 random_prefix: A randomly-generated prefix to the name of the
2713 temporary components.
2714 existing_objects: A list of ObjectFromTracker objects representing
2715 the set of files that have already been uploaded.
2716 """
2717 existing_objects = []
2718 try:
2719 with tracker_file_lock:
2720 f = open(tracker_file, 'r')
2721 lines = f.readlines()
2722 lines = [line.strip() for line in lines]
2723 f.close()
2724 except IOError as e:
2725 # We can't read the tracker file, so generate a new random prefix.
2726 lines = [str(random.randint(1, (10 ** 10) - 1))]
2727
2728 # Ignore non-existent file (happens first time an upload
2729 # is attempted on a file), but warn user for other errors.
2730 if e.errno != errno.ENOENT:
2731 # Will restart because we failed to read in the file.
2732 print('Couldn\'t read parallel upload tracker file (%s): %s. '
2733 'Restarting upload from scratch.' % (tracker_file, e.strerror))
2734
2735 # The first line contains the randomly-generated prefix.
2736 random_prefix = lines[0]
2737
2738 # The remaining lines were written in pairs to describe a single component
2739 # in the form:
2740 # object_name (without random prefix)
2741 # generation
2742 # Newlines are used as the delimiter because only newlines and carriage
2743 # returns are invalid characters in object names, and users can specify
2744 # a custom prefix in the config file.
2745 i = 1
2746 while i < len(lines):
2747 (name, generation) = (lines[i], lines[i+1])
2748 if generation == '':
2749 generation = None
2750 existing_objects.append(ObjectFromTracker(name, generation))
2751 i += 2
2752 return (random_prefix, existing_objects)
2753
2754 def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component,
2755 tracker_file_lock):
2756 """Appends information about the uploaded component to the contents of an
2757 existing tracker file, following the format described in
2758 _CreateParallelUploadTrackerFile."""
2759 lines = _GetParallelUploadTrackerFileLinesForComponents([component])
2760 lines = [line + '\n' for line in lines]
2761 with tracker_file_lock:
2762 with open(tracker_file, 'a') as f:
2763 f.writelines(lines)
2764
2765 def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components,
2766 tracker_file_lock):
2767 """Writes information about components that were successfully uploaded so
2768 that the upload can be resumed at a later date. The tracker file has
2769 the format:
2770 random_prefix
2771 temp_object_1_name
2772 temp_object_1_generation
2773 .
2774 .
2775 .
2776 temp_object_N_name
2777 temp_object_N_generation
2778 where N is the number of components that have been successfully uploaded.
2779
2780 Args:
2781 tracker_file: The name of the parallel upload tracker file.
2782 random_prefix: The randomly-generated prefix that was used for
2783 for uploading any existing components.
2784 components: A list of ObjectFromTracker objects that were uploaded.
2785 """
2786 lines = [random_prefix]
2787 lines += _GetParallelUploadTrackerFileLinesForComponents(components)
2788 lines = [line + '\n' for line in lines]
2789 with tracker_file_lock:
2790 open(tracker_file, 'w').close() # Clear the file.
2791 with open(tracker_file, 'w') as f:
2792 f.writelines(lines)
2793
2794 def _GetParallelUploadTrackerFileLinesForComponents(components):
2795 """Return a list of the lines that should appear in the parallel composite
2796 upload tracker file representing the given components, using the format
2797 as described in _CreateParallelUploadTrackerFile."""
2798 lines = []
2799 for component in components:
2800 generation = None
2801 generation = component.generation
2802 if not generation:
2803 generation = ''
2804 lines += [component.object_name, generation]
2805 return lines
2806
2807 def FilterExistingComponents(dst_args, existing_components,
2808 bucket_name, suri_builder):
2809 """Given the list of all target objects based on partitioning the file and
2810 the list of objects that have already been uploaded successfully,
2811 this function determines which objects should be uploaded, which
2812 existing components are still valid, and which existing components should
2813 be deleted.
2814
2815 Args:
2816 dst_args: The map of file_name -> PerformResumableUploadIfAppliesArgs
2817 calculated by partitioning the file.
2818 existing_components: A list of ObjectFromTracker objects that have been
2819 uploaded in the past.
2820 bucket_name: The name of the bucket in which the components exist.
2821
2822 Returns:
2823 components_to_upload: List of components that need to be uploaded.
2824 uploaded_components: List of components that have already been
2825 uploaded and are still valid.
2826 existing_objects_to_delete: List of components that have already
2827 been uploaded, but are no longer valid
2828 and are in a versioned bucket, and
2829 therefore should be deleted.
2830 """
2831 components_to_upload = []
2832 existing_component_names = [component.object_name
2833 for component in existing_components]
2834 for component_name in dst_args:
2835 if not (component_name in existing_component_names):
2836 components_to_upload.append(dst_args[component_name])
2837
2838 objects_already_chosen = []
2839
2840 # Don't reuse any temporary components whose MD5 doesn't match the current
2841 # MD5 of the corresponding part of the file. If the bucket is versioned,
2842 # also make sure that we delete the existing temporary version.
2843 existing_objects_to_delete = []
2844 uploaded_components = []
2845 for tracker_object in existing_components:
2846 if ((not tracker_object.object_name in dst_args.keys())
2847 or tracker_object.object_name in objects_already_chosen):
2848 # This could happen if the component size has changed. This also serves
2849 # to handle object names that get duplicated in the tracker file due
2850 # to people doing things they shouldn't (e.g., overwriting an existing
2851 # temporary component in a versioned bucket).
2852
2853 uri = MakeGsUri(bucket_name, tracker_object.object_name, suri_builder)
2854 uri.generation = tracker_object.generation
2855 existing_objects_to_delete.append(uri)
2856 continue
2857
2858 dst_arg = dst_args[tracker_object.object_name]
2859 file_part = FilePart(dst_arg.filename, dst_arg.file_start,
2860 dst_arg.file_length)
2861 # TODO: calculate MD5's in parallel when possible.
2862 content_md5 = _CalculateMd5FromContents(file_part)
2863
2864 try:
2865 # Get the MD5 of the currently-existing component.
2866 blr = BucketListingRef(dst_arg.dst_uri)
2867 etag = blr.GetKey().etag
2868 except Exception as e:
2869 # We don't actually care what went wrong - we couldn't retrieve the
2870 # object to check the MD5, so just upload it again.
2871 etag = None
2872 if etag != (('"%s"') % content_md5):
2873 components_to_upload.append(dst_arg)
2874 objects_already_chosen.append(tracker_object.object_name)
2875 if tracker_object.generation:
2876 # If the old object doesn't have a generation (i.e., it isn't in a
2877 # versioned bucket), then we will just overwrite it anyway.
2878 invalid_component_with_generation = copy.deepcopy(dst_arg.dst_uri)
2879 invalid_component_with_generation.generation = tracker_object.generation
2880 existing_objects_to_delete.append(invalid_component_with_generation)
2881 else:
2882 uri = copy.deepcopy(dst_arg.dst_uri)
2883 uri.generation = tracker_object.generation
2884 uploaded_components.append(uri)
2885 objects_already_chosen.append(tracker_object.object_name)
2886
2887 if uploaded_components:
2888 logging.info(("Found %d existing temporary components to reuse.")
2889 % len(uploaded_components))
2890
2891 return (components_to_upload, uploaded_components,
2892 existing_objects_to_delete)
2893
2894 def MakeGsUri(bucket, filename, suri_builder):
2895 """Returns a StorageUri for an object in GCS."""
2896 return suri_builder.StorageUri(bucket + '/' + filename)
2897
2898 def _CalculateMd5FromContents(file):
2899 """Calculates the MD5 hash of the contents of a file.
2900
2901 Args:
2902 file: An already-open file object.
2903 """
2904 current_md5 = md5()
2905 file.seek(0)
2906 while True:
2907 data = file.read(8192)
2908 if not data:
2909 break
2910 current_md5.update(data)
2911 file.seek(0)
2912 return current_md5.hexdigest()
OLDNEW
« no previous file with comments | « gslib/commands/cors.py ('k') | gslib/commands/defacl.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698