OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2013 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """Integration tests for cp command.""" | |
16 | |
17 from __future__ import absolute_import | |
18 | |
19 import base64 | |
20 import binascii | |
21 import datetime | |
22 import httplib | |
23 import logging | |
24 import os | |
25 import pickle | |
26 import pkgutil | |
27 import random | |
28 import re | |
29 import string | |
30 import sys | |
31 | |
32 from apitools.base.py import exceptions as apitools_exceptions | |
33 import boto | |
34 from boto import storage_uri | |
35 from boto.exception import ResumableTransferDisposition | |
36 from boto.exception import ResumableUploadException | |
37 from boto.exception import StorageResponseError | |
38 from boto.storage_uri import BucketStorageUri | |
39 | |
40 from gslib.cloud_api import ResumableDownloadException | |
41 from gslib.cloud_api import ResumableUploadException | |
42 from gslib.cloud_api import ResumableUploadStartOverException | |
43 from gslib.copy_helper import GetTrackerFilePath | |
44 from gslib.copy_helper import TrackerFileType | |
45 from gslib.cs_api_map import ApiSelector | |
46 from gslib.gcs_json_api import GcsJsonApi | |
47 from gslib.hashing_helper import CalculateMd5FromContents | |
48 from gslib.storage_url import StorageUrlFromString | |
49 import gslib.tests.testcase as testcase | |
50 from gslib.tests.testcase.base import NotParallelizable | |
51 from gslib.tests.testcase.integration_testcase import SkipForS3 | |
52 from gslib.tests.util import GenerationFromURI as urigen | |
53 from gslib.tests.util import HAS_S3_CREDS | |
54 from gslib.tests.util import ObjectToURI as suri | |
55 from gslib.tests.util import PerformsFileToObjectUpload | |
56 from gslib.tests.util import SetBotoConfigForTest | |
57 from gslib.tests.util import unittest | |
58 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages | |
59 from gslib.tracker_file import DeleteTrackerFile | |
60 from gslib.tracker_file import GetRewriteTrackerFilePath | |
61 from gslib.util import EIGHT_MIB | |
62 from gslib.util import IS_WINDOWS | |
63 from gslib.util import MakeHumanReadable | |
64 from gslib.util import ONE_KIB | |
65 from gslib.util import ONE_MIB | |
66 from gslib.util import Retry | |
67 from gslib.util import START_CALLBACK_PER_BYTES | |
68 from gslib.util import UTF8 | |
69 | |
70 | |
71 # Custom test callbacks must be pickleable, and therefore at global scope. | |
72 class _HaltingCopyCallbackHandler(object): | |
73 """Test callback handler for intentionally stopping a resumable transfer.""" | |
74 | |
75 def __init__(self, is_upload, halt_at_byte): | |
76 self._is_upload = is_upload | |
77 self._halt_at_byte = halt_at_byte | |
78 | |
79 # pylint: disable=invalid-name | |
80 def call(self, total_bytes_transferred, total_size): | |
81 """Forcibly exits if the transfer has passed the halting point.""" | |
82 if total_bytes_transferred >= self._halt_at_byte: | |
83 sys.stderr.write( | |
84 'Halting transfer after byte %s. %s/%s transferred.\r\n' % ( | |
85 self._halt_at_byte, MakeHumanReadable(total_bytes_transferred), | |
86 MakeHumanReadable(total_size))) | |
87 if self._is_upload: | |
88 raise ResumableUploadException('Artifically halting upload.') | |
89 else: | |
90 raise ResumableDownloadException('Artifically halting download.') | |
91 | |
92 | |
93 class _JSONForceHTTPErrorCopyCallbackHandler(object): | |
94 """Test callback handler that raises an arbitrary HTTP error exception.""" | |
95 | |
96 def __init__(self, startover_at_byte, http_error_num): | |
97 self._startover_at_byte = startover_at_byte | |
98 self._http_error_num = http_error_num | |
99 self.started_over_once = False | |
100 | |
101 # pylint: disable=invalid-name | |
102 def call(self, total_bytes_transferred, total_size): | |
103 """Forcibly exits if the transfer has passed the halting point.""" | |
104 if (total_bytes_transferred >= self._startover_at_byte | |
105 and not self.started_over_once): | |
106 sys.stderr.write( | |
107 'Forcing HTTP error %s after byte %s. ' | |
108 '%s/%s transferred.\r\n' % ( | |
109 self._http_error_num, | |
110 self._startover_at_byte, | |
111 MakeHumanReadable(total_bytes_transferred), | |
112 MakeHumanReadable(total_size))) | |
113 self.started_over_once = True | |
114 raise apitools_exceptions.HttpError( | |
115 {'status': self._http_error_num}, None, None) | |
116 | |
117 | |
118 class _XMLResumableUploadStartOverCopyCallbackHandler(object): | |
119 """Test callback handler that raises start-over exception during upload.""" | |
120 | |
121 def __init__(self, startover_at_byte): | |
122 self._startover_at_byte = startover_at_byte | |
123 self.started_over_once = False | |
124 | |
125 # pylint: disable=invalid-name | |
126 def call(self, total_bytes_transferred, total_size): | |
127 """Forcibly exits if the transfer has passed the halting point.""" | |
128 if (total_bytes_transferred >= self._startover_at_byte | |
129 and not self.started_over_once): | |
130 sys.stderr.write( | |
131 'Forcing ResumableUpload start over error after byte %s. ' | |
132 '%s/%s transferred.\r\n' % ( | |
133 self._startover_at_byte, | |
134 MakeHumanReadable(total_bytes_transferred), | |
135 MakeHumanReadable(total_size))) | |
136 self.started_over_once = True | |
137 raise boto.exception.ResumableUploadException( | |
138 'Forcing upload start over', | |
139 ResumableTransferDisposition.START_OVER) | |
140 | |
141 | |
142 class _DeleteBucketThenStartOverCopyCallbackHandler(object): | |
143 """Test callback handler that deletes bucket then raises start-over.""" | |
144 | |
145 def __init__(self, startover_at_byte, bucket_uri): | |
146 self._startover_at_byte = startover_at_byte | |
147 self._bucket_uri = bucket_uri | |
148 self.started_over_once = False | |
149 | |
150 # pylint: disable=invalid-name | |
151 def call(self, total_bytes_transferred, total_size): | |
152 """Forcibly exits if the transfer has passed the halting point.""" | |
153 if (total_bytes_transferred >= self._startover_at_byte | |
154 and not self.started_over_once): | |
155 sys.stderr.write('Deleting bucket (%s)' %(self._bucket_uri.bucket_name)) | |
156 | |
157 @Retry(StorageResponseError, tries=5, timeout_secs=1) | |
158 def DeleteBucket(): | |
159 bucket_list = list(self._bucket_uri.list_bucket(all_versions=True)) | |
160 for k in bucket_list: | |
161 self._bucket_uri.get_bucket().delete_key(k.name, | |
162 version_id=k.version_id) | |
163 self._bucket_uri.delete_bucket() | |
164 | |
165 DeleteBucket() | |
166 sys.stderr.write( | |
167 'Forcing ResumableUpload start over error after byte %s. ' | |
168 '%s/%s transferred.\r\n' % ( | |
169 self._startover_at_byte, | |
170 MakeHumanReadable(total_bytes_transferred), | |
171 MakeHumanReadable(total_size))) | |
172 self.started_over_once = True | |
173 raise ResumableUploadStartOverException( | |
174 'Artificially forcing start-over') | |
175 | |
176 | |
177 class _RewriteHaltException(Exception): | |
178 pass | |
179 | |
180 | |
181 class _HaltingRewriteCallbackHandler(object): | |
182 """Test callback handler for intentionally stopping a rewrite operation.""" | |
183 | |
184 def __init__(self, halt_at_byte): | |
185 self._halt_at_byte = halt_at_byte | |
186 | |
187 # pylint: disable=invalid-name | |
188 def call(self, total_bytes_rewritten, unused_total_size): | |
189 """Forcibly exits if the operation has passed the halting point.""" | |
190 if total_bytes_rewritten >= self._halt_at_byte: | |
191 raise _RewriteHaltException('Artificially halting rewrite') | |
192 | |
193 | |
194 class _EnsureRewriteResumeCallbackHandler(object): | |
195 """Test callback handler for ensuring a rewrite operation resumed.""" | |
196 | |
197 def __init__(self, required_byte): | |
198 self._required_byte = required_byte | |
199 | |
200 # pylint: disable=invalid-name | |
201 def call(self, total_bytes_rewritten, unused_total_size): | |
202 """Forcibly exits if the operation has passed the halting point.""" | |
203 if total_bytes_rewritten <= self._required_byte: | |
204 raise _RewriteHaltException( | |
205 'Rewrite did not resume; %s bytes written, but %s bytes should ' | |
206 'have already been written.' % (total_bytes_rewritten, | |
207 self._required_byte)) | |
208 | |
209 | |
210 class _ResumableUploadRetryHandler(object): | |
211 """Test callback handler for causing retries during a resumable transfer.""" | |
212 | |
213 def __init__(self, retry_at_byte, exception_to_raise, exc_args, | |
214 num_retries=1): | |
215 self._retry_at_byte = retry_at_byte | |
216 self._exception_to_raise = exception_to_raise | |
217 self._exception_args = exc_args | |
218 self._num_retries = num_retries | |
219 | |
220 self._retries_made = 0 | |
221 | |
222 # pylint: disable=invalid-name | |
223 def call(self, total_bytes_transferred, unused_total_size): | |
224 """Cause a single retry at the retry point.""" | |
225 if (total_bytes_transferred >= self._retry_at_byte | |
226 and self._retries_made < self._num_retries): | |
227 self._retries_made += 1 | |
228 raise self._exception_to_raise(*self._exception_args) | |
229 | |
230 | |
231 class TestCp(testcase.GsUtilIntegrationTestCase): | |
232 """Integration tests for cp command.""" | |
233 | |
234 # For tests that artificially halt, we need to ensure at least one callback | |
235 # occurs. | |
236 halt_size = START_CALLBACK_PER_BYTES * 2 | |
237 | |
238 def _get_test_file(self, name): | |
239 contents = pkgutil.get_data('gslib', 'tests/test_data/%s' % name) | |
240 return self.CreateTempFile(file_name=name, contents=contents) | |
241 | |
242 @PerformsFileToObjectUpload | |
243 def test_noclobber(self): | |
244 key_uri = self.CreateObject(contents='foo') | |
245 fpath = self.CreateTempFile(contents='bar') | |
246 stderr = self.RunGsUtil(['cp', '-n', fpath, suri(key_uri)], | |
247 return_stderr=True) | |
248 self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr) | |
249 self.assertEqual(key_uri.get_contents_as_string(), 'foo') | |
250 stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath], | |
251 return_stderr=True) | |
252 with open(fpath, 'r') as f: | |
253 self.assertIn('Skipping existing item: %s' % suri(f), stderr) | |
254 self.assertEqual(f.read(), 'bar') | |
255 | |
256 def test_dest_bucket_not_exist(self): | |
257 fpath = self.CreateTempFile(contents='foo') | |
258 invalid_bucket_uri = ( | |
259 '%s://%s' % (self.default_provider, self.nonexistent_bucket_name)) | |
260 stderr = self.RunGsUtil(['cp', fpath, invalid_bucket_uri], | |
261 expected_status=1, return_stderr=True) | |
262 self.assertIn('does not exist.', stderr) | |
263 | |
264 def test_copy_in_cloud_noclobber(self): | |
265 bucket1_uri = self.CreateBucket() | |
266 bucket2_uri = self.CreateBucket() | |
267 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') | |
268 stderr = self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)], | |
269 return_stderr=True) | |
270 # Rewrite API may output an additional 'Copying' progress notification. | |
271 self.assertGreaterEqual(stderr.count('Copying'), 1) | |
272 self.assertLessEqual(stderr.count('Copying'), 2) | |
273 stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), suri(bucket2_uri)], | |
274 return_stderr=True) | |
275 self.assertIn('Skipping existing item: %s' % | |
276 suri(bucket2_uri, key_uri.object_name), stderr) | |
277 | |
278 @PerformsFileToObjectUpload | |
279 def test_streaming(self): | |
280 bucket_uri = self.CreateBucket() | |
281 stderr = self.RunGsUtil(['cp', '-', '%s' % suri(bucket_uri, 'foo')], | |
282 stdin='bar', return_stderr=True) | |
283 self.assertIn('Copying from <STDIN>', stderr) | |
284 key_uri = bucket_uri.clone_replace_name('foo') | |
285 self.assertEqual(key_uri.get_contents_as_string(), 'bar') | |
286 | |
287 def test_streaming_multiple_arguments(self): | |
288 bucket_uri = self.CreateBucket() | |
289 stderr = self.RunGsUtil(['cp', '-', '-', suri(bucket_uri)], | |
290 stdin='bar', return_stderr=True, expected_status=1) | |
291 self.assertIn('Multiple URL strings are not supported with streaming', | |
292 stderr) | |
293 | |
294 # TODO: Implement a way to test both with and without using magic file. | |
295 | |
296 @PerformsFileToObjectUpload | |
297 def test_detect_content_type(self): | |
298 """Tests local detection of content type.""" | |
299 bucket_uri = self.CreateBucket() | |
300 dsturi = suri(bucket_uri, 'foo') | |
301 | |
302 self.RunGsUtil(['cp', self._get_test_file('test.mp3'), dsturi]) | |
303 | |
304 # Use @Retry as hedge against bucket listing eventual consistency. | |
305 @Retry(AssertionError, tries=3, timeout_secs=1) | |
306 def _Check1(): | |
307 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) | |
308 if IS_WINDOWS: | |
309 self.assertTrue( | |
310 re.search(r'Content-Type:\s+audio/x-mpg', stdout) or | |
311 re.search(r'Content-Type:\s+audio/mpeg', stdout)) | |
312 else: | |
313 self.assertRegexpMatches(stdout, r'Content-Type:\s+audio/mpeg') | |
314 _Check1() | |
315 | |
316 self.RunGsUtil(['cp', self._get_test_file('test.gif'), dsturi]) | |
317 | |
318 # Use @Retry as hedge against bucket listing eventual consistency. | |
319 @Retry(AssertionError, tries=3, timeout_secs=1) | |
320 def _Check2(): | |
321 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) | |
322 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') | |
323 _Check2() | |
324 | |
325 def test_content_type_override_default(self): | |
326 """Tests overriding content type with the default value.""" | |
327 bucket_uri = self.CreateBucket() | |
328 dsturi = suri(bucket_uri, 'foo') | |
329 | |
330 self.RunGsUtil(['-h', 'Content-Type:', 'cp', | |
331 self._get_test_file('test.mp3'), dsturi]) | |
332 | |
333 # Use @Retry as hedge against bucket listing eventual consistency. | |
334 @Retry(AssertionError, tries=3, timeout_secs=1) | |
335 def _Check1(): | |
336 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) | |
337 self.assertRegexpMatches(stdout, | |
338 r'Content-Type:\s+application/octet-stream') | |
339 _Check1() | |
340 | |
341 self.RunGsUtil(['-h', 'Content-Type:', 'cp', | |
342 self._get_test_file('test.gif'), dsturi]) | |
343 | |
344 # Use @Retry as hedge against bucket listing eventual consistency. | |
345 @Retry(AssertionError, tries=3, timeout_secs=1) | |
346 def _Check2(): | |
347 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) | |
348 self.assertRegexpMatches(stdout, | |
349 r'Content-Type:\s+application/octet-stream') | |
350 _Check2() | |
351 | |
352 def test_content_type_override(self): | |
353 """Tests overriding content type with a value.""" | |
354 bucket_uri = self.CreateBucket() | |
355 dsturi = suri(bucket_uri, 'foo') | |
356 | |
357 self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp', | |
358 self._get_test_file('test.mp3'), dsturi]) | |
359 | |
360 # Use @Retry as hedge against bucket listing eventual consistency. | |
361 @Retry(AssertionError, tries=3, timeout_secs=1) | |
362 def _Check1(): | |
363 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) | |
364 self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain') | |
365 _Check1() | |
366 | |
367 self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp', | |
368 self._get_test_file('test.gif'), dsturi]) | |
369 | |
370 # Use @Retry as hedge against bucket listing eventual consistency. | |
371 @Retry(AssertionError, tries=3, timeout_secs=1) | |
372 def _Check2(): | |
373 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) | |
374 self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain') | |
375 _Check2() | |
376 | |
377 @unittest.skipIf(IS_WINDOWS, 'magicfile is not available on Windows.') | |
378 @PerformsFileToObjectUpload | |
379 def test_magicfile_override(self): | |
380 """Tests content type override with magicfile value.""" | |
381 bucket_uri = self.CreateBucket() | |
382 dsturi = suri(bucket_uri, 'foo') | |
383 fpath = self.CreateTempFile(contents='foo/bar\n') | |
384 self.RunGsUtil(['cp', fpath, dsturi]) | |
385 | |
386 # Use @Retry as hedge against bucket listing eventual consistency. | |
387 @Retry(AssertionError, tries=3, timeout_secs=1) | |
388 def _Check1(): | |
389 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) | |
390 use_magicfile = boto.config.getbool('GSUtil', 'use_magicfile', False) | |
391 content_type = ('text/plain' if use_magicfile | |
392 else 'application/octet-stream') | |
393 self.assertRegexpMatches(stdout, r'Content-Type:\s+%s' % content_type) | |
394 _Check1() | |
395 | |
396 @PerformsFileToObjectUpload | |
397 def test_content_type_mismatches(self): | |
398 """Tests overriding content type when it does not match the file type.""" | |
399 bucket_uri = self.CreateBucket() | |
400 dsturi = suri(bucket_uri, 'foo') | |
401 fpath = self.CreateTempFile(contents='foo/bar\n') | |
402 | |
403 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', | |
404 self._get_test_file('test.mp3'), dsturi]) | |
405 | |
406 # Use @Retry as hedge against bucket listing eventual consistency. | |
407 @Retry(AssertionError, tries=3, timeout_secs=1) | |
408 def _Check1(): | |
409 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) | |
410 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') | |
411 _Check1() | |
412 | |
413 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', | |
414 self._get_test_file('test.gif'), dsturi]) | |
415 | |
416 # Use @Retry as hedge against bucket listing eventual consistency. | |
417 @Retry(AssertionError, tries=3, timeout_secs=1) | |
418 def _Check2(): | |
419 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) | |
420 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') | |
421 _Check2() | |
422 | |
423 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', fpath, dsturi]) | |
424 | |
425 # Use @Retry as hedge against bucket listing eventual consistency. | |
426 @Retry(AssertionError, tries=3, timeout_secs=1) | |
427 def _Check3(): | |
428 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) | |
429 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') | |
430 _Check3() | |
431 | |
432 @PerformsFileToObjectUpload | |
433 def test_content_type_header_case_insensitive(self): | |
434 """Tests that content type header is treated with case insensitivity.""" | |
435 bucket_uri = self.CreateBucket() | |
436 dsturi = suri(bucket_uri, 'foo') | |
437 fpath = self._get_test_file('test.gif') | |
438 | |
439 self.RunGsUtil(['-h', 'content-Type:text/plain', 'cp', | |
440 fpath, dsturi]) | |
441 | |
442 # Use @Retry as hedge against bucket listing eventual consistency. | |
443 @Retry(AssertionError, tries=3, timeout_secs=1) | |
444 def _Check1(): | |
445 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) | |
446 self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain') | |
447 self.assertNotRegexpMatches(stdout, r'image/gif') | |
448 _Check1() | |
449 | |
450 self.RunGsUtil(['-h', 'CONTENT-TYPE:image/gif', | |
451 '-h', 'content-type:image/gif', | |
452 'cp', fpath, dsturi]) | |
453 | |
454 # Use @Retry as hedge against bucket listing eventual consistency. | |
455 @Retry(AssertionError, tries=3, timeout_secs=1) | |
456 def _Check2(): | |
457 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) | |
458 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') | |
459 self.assertNotRegexpMatches(stdout, r'image/gif,\s*image/gif') | |
460 _Check2() | |
461 | |
462 @PerformsFileToObjectUpload | |
463 def test_other_headers(self): | |
464 """Tests that non-content-type headers are applied successfully on copy.""" | |
465 bucket_uri = self.CreateBucket() | |
466 dst_uri = suri(bucket_uri, 'foo') | |
467 fpath = self._get_test_file('test.gif') | |
468 | |
469 self.RunGsUtil(['-h', 'Cache-Control:public,max-age=12', | |
470 '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta, 'cp', | |
471 fpath, dst_uri]) | |
472 | |
473 stdout = self.RunGsUtil(['ls', '-L', dst_uri], return_stdout=True) | |
474 self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12') | |
475 self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd') | |
476 | |
477 dst_uri2 = suri(bucket_uri, 'bar') | |
478 self.RunGsUtil(['cp', dst_uri, dst_uri2]) | |
479 # Ensure metadata was preserved across copy. | |
480 stdout = self.RunGsUtil(['ls', '-L', dst_uri2], return_stdout=True) | |
481 self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12') | |
482 self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd') | |
483 | |
484 @PerformsFileToObjectUpload | |
485 def test_versioning(self): | |
486 """Tests copy with versioning.""" | |
487 bucket_uri = self.CreateVersionedBucket() | |
488 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2') | |
489 k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') | |
490 g1 = urigen(k2_uri) | |
491 self.RunGsUtil(['cp', suri(k1_uri), suri(k2_uri)]) | |
492 k2_uri = bucket_uri.clone_replace_name(k2_uri.object_name) | |
493 k2_uri = bucket_uri.clone_replace_key(k2_uri.get_key()) | |
494 g2 = urigen(k2_uri) | |
495 k2_uri.set_contents_from_string('data3') | |
496 g3 = urigen(k2_uri) | |
497 | |
498 fpath = self.CreateTempFile() | |
499 # Check to make sure current version is data3. | |
500 self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath]) | |
501 with open(fpath, 'r') as f: | |
502 self.assertEqual(f.read(), 'data3') | |
503 | |
504 # Check contents of all three versions | |
505 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), fpath]) | |
506 with open(fpath, 'r') as f: | |
507 self.assertEqual(f.read(), 'data1') | |
508 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g2), fpath]) | |
509 with open(fpath, 'r') as f: | |
510 self.assertEqual(f.read(), 'data2') | |
511 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g3), fpath]) | |
512 with open(fpath, 'r') as f: | |
513 self.assertEqual(f.read(), 'data3') | |
514 | |
515 # Copy first version to current and verify. | |
516 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), | |
517 k2_uri.versionless_uri]) | |
518 self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath]) | |
519 with open(fpath, 'r') as f: | |
520 self.assertEqual(f.read(), 'data1') | |
521 | |
522 # Attempt to specify a version-specific URI for destination. | |
523 stderr = self.RunGsUtil(['cp', fpath, k2_uri.uri], return_stderr=True, | |
524 expected_status=1) | |
525 self.assertIn('cannot be the destination for gsutil cp', stderr) | |
526 | |
527 @SkipForS3('S3 lists versioned objects in reverse timestamp order.') | |
528 def test_recursive_copying_versioned_bucket(self): | |
529 """Tests that cp -R with versioned buckets copies all versions in order.""" | |
530 bucket1_uri = self.CreateVersionedBucket() | |
531 bucket2_uri = self.CreateVersionedBucket() | |
532 | |
533 # Write two versions of an object to the bucket1. | |
534 self.CreateObject(bucket_uri=bucket1_uri, object_name='k', contents='data0') | |
535 self.CreateObject(bucket_uri=bucket1_uri, object_name='k', | |
536 contents='longer_data1') | |
537 | |
538 self.AssertNObjectsInBucket(bucket1_uri, 2, versioned=True) | |
539 self.AssertNObjectsInBucket(bucket2_uri, 0, versioned=True) | |
540 | |
541 # Recursively copy to second versioned bucket. | |
542 self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'), suri(bucket2_uri)]) | |
543 | |
544 # Use @Retry as hedge against bucket listing eventual consistency. | |
545 @Retry(AssertionError, tries=3, timeout_secs=1) | |
546 def _Check2(): | |
547 """Validates the results of the cp -R.""" | |
548 listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)], | |
549 return_stdout=True).split('\n') | |
550 listing2 = self.RunGsUtil(['ls', '-la', suri(bucket2_uri)], | |
551 return_stdout=True).split('\n') | |
552 # 2 lines of listing output, 1 summary line, 1 empty line from \n split. | |
553 self.assertEquals(len(listing1), 4) | |
554 self.assertEquals(len(listing2), 4) | |
555 | |
556 # First object in each bucket should match in size and version-less name. | |
557 size1, _, uri_str1, _ = listing1[0].split() | |
558 self.assertEquals(size1, str(len('data0'))) | |
559 self.assertEquals(storage_uri(uri_str1).object_name, 'k') | |
560 size2, _, uri_str2, _ = listing2[0].split() | |
561 self.assertEquals(size2, str(len('data0'))) | |
562 self.assertEquals(storage_uri(uri_str2).object_name, 'k') | |
563 | |
564 # Similarly for second object in each bucket. | |
565 size1, _, uri_str1, _ = listing1[1].split() | |
566 self.assertEquals(size1, str(len('longer_data1'))) | |
567 self.assertEquals(storage_uri(uri_str1).object_name, 'k') | |
568 size2, _, uri_str2, _ = listing2[1].split() | |
569 self.assertEquals(size2, str(len('longer_data1'))) | |
570 self.assertEquals(storage_uri(uri_str2).object_name, 'k') | |
571 _Check2() | |
572 | |
573 @PerformsFileToObjectUpload | |
574 @SkipForS3('Preconditions not supported for S3.') | |
575 def test_cp_generation_zero_match(self): | |
576 """Tests that cp handles an object-not-exists precondition header.""" | |
577 bucket_uri = self.CreateBucket() | |
578 fpath1 = self.CreateTempFile(contents='data1') | |
579 # Match 0 means only write the object if it doesn't already exist. | |
580 gen_match_header = 'x-goog-if-generation-match:0' | |
581 | |
582 # First copy should succeed. | |
583 # TODO: This can fail (rarely) if the server returns a 5xx but actually | |
584 # commits the bytes. If we add restarts on small uploads, handle this | |
585 # case. | |
586 self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(bucket_uri)]) | |
587 | |
588 # Second copy should fail with a precondition error. | |
589 stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, | |
590 suri(bucket_uri)], | |
591 return_stderr=True, expected_status=1) | |
592 self.assertIn('PreconditionException', stderr) | |
593 | |
594 @PerformsFileToObjectUpload | |
595 @SkipForS3('Preconditions not supported for S3.') | |
596 def test_cp_v_generation_match(self): | |
597 """Tests that cp -v option handles the if-generation-match header.""" | |
598 bucket_uri = self.CreateVersionedBucket() | |
599 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') | |
600 g1 = k1_uri.generation | |
601 | |
602 tmpdir = self.CreateTempDir() | |
603 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2') | |
604 | |
605 gen_match_header = 'x-goog-if-generation-match:%s' % g1 | |
606 # First copy should succeed. | |
607 self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(k1_uri)]) | |
608 | |
609 # Second copy should fail the precondition. | |
610 stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, | |
611 suri(k1_uri)], | |
612 return_stderr=True, expected_status=1) | |
613 | |
614 self.assertIn('PreconditionException', stderr) | |
615 | |
616 # Specifiying a generation with -n should fail before the request hits the | |
617 # server. | |
618 stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', '-n', fpath1, | |
619 suri(k1_uri)], | |
620 return_stderr=True, expected_status=1) | |
621 | |
622 self.assertIn('ArgumentException', stderr) | |
623 self.assertIn('Specifying x-goog-if-generation-match is not supported ' | |
624 'with cp -n', stderr) | |
625 | |
626 @PerformsFileToObjectUpload | |
627 def test_cp_nv(self): | |
628 """Tests that cp -nv works when skipping existing file.""" | |
629 bucket_uri = self.CreateVersionedBucket() | |
630 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') | |
631 | |
632 tmpdir = self.CreateTempDir() | |
633 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2') | |
634 | |
635 # First copy should succeed. | |
636 self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)]) | |
637 | |
638 # Second copy should skip copying. | |
639 stderr = self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)], | |
640 return_stderr=True) | |
641 self.assertIn('Skipping existing item:', stderr) | |
642 | |
643 @PerformsFileToObjectUpload | |
644 @SkipForS3('S3 lists versioned objects in reverse timestamp order.') | |
645 def test_cp_v_option(self): | |
646 """"Tests that cp -v returns the created object's version-specific URI.""" | |
647 bucket_uri = self.CreateVersionedBucket() | |
648 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') | |
649 k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2') | |
650 | |
651 # Case 1: Upload file to object using one-shot PUT. | |
652 tmpdir = self.CreateTempDir() | |
653 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1') | |
654 self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri) | |
655 | |
656 # Case 2: Upload file to object using resumable upload. | |
657 size_threshold = ONE_KIB | |
658 boto_config_for_test = ('GSUtil', 'resumable_threshold', | |
659 str(size_threshold)) | |
660 with SetBotoConfigForTest([boto_config_for_test]): | |
661 file_as_string = os.urandom(size_threshold) | |
662 tmpdir = self.CreateTempDir() | |
663 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=file_as_string) | |
664 self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri) | |
665 | |
666 # Case 3: Upload stream to object. | |
667 self._run_cp_minus_v_test('-v', '-', k2_uri.uri) | |
668 | |
669 # Case 4: Download object to file. For this case we just expect output of | |
670 # gsutil cp -v to be the URI of the file. | |
671 tmpdir = self.CreateTempDir() | |
672 fpath1 = self.CreateTempFile(tmpdir=tmpdir) | |
673 dst_uri = storage_uri(fpath1) | |
674 stderr = self.RunGsUtil(['cp', '-v', suri(k1_uri), suri(dst_uri)], | |
675 return_stderr=True) | |
676 self.assertIn('Created: %s' % dst_uri.uri, stderr.split('\n')[-2]) | |
677 | |
678 # Case 5: Daisy-chain from object to object. | |
679 self._run_cp_minus_v_test('-Dv', k1_uri.uri, k2_uri.uri) | |
680 | |
681 # Case 6: Copy object to object in-the-cloud. | |
682 self._run_cp_minus_v_test('-v', k1_uri.uri, k2_uri.uri) | |
683 | |
684 def _run_cp_minus_v_test(self, opt, src_str, dst_str): | |
685 """Runs cp -v with the options and validates the results.""" | |
686 stderr = self.RunGsUtil(['cp', opt, src_str, dst_str], return_stderr=True) | |
687 match = re.search(r'Created: (.*)\n', stderr) | |
688 self.assertIsNotNone(match) | |
689 created_uri = match.group(1) | |
690 | |
691 # Use @Retry as hedge against bucket listing eventual consistency. | |
692 @Retry(AssertionError, tries=3, timeout_secs=1) | |
693 def _Check1(): | |
694 stdout = self.RunGsUtil(['ls', '-a', dst_str], return_stdout=True) | |
695 lines = stdout.split('\n') | |
696 # Final (most recent) object should match the "Created:" URI. This is | |
697 # in second-to-last line (last line is '\n'). | |
698 self.assertGreater(len(lines), 2) | |
699 self.assertEqual(created_uri, lines[-2]) | |
700 _Check1() | |
701 | |
702 @PerformsFileToObjectUpload | |
703 def test_stdin_args(self): | |
704 """Tests cp with the -I option.""" | |
705 tmpdir = self.CreateTempDir() | |
706 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1') | |
707 fpath2 = self.CreateTempFile(tmpdir=tmpdir, contents='data2') | |
708 bucket_uri = self.CreateBucket() | |
709 self.RunGsUtil(['cp', '-I', suri(bucket_uri)], | |
710 stdin='\n'.join((fpath1, fpath2))) | |
711 | |
712 # Use @Retry as hedge against bucket listing eventual consistency. | |
713 @Retry(AssertionError, tries=3, timeout_secs=1) | |
714 def _Check1(): | |
715 stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True) | |
716 self.assertIn(os.path.basename(fpath1), stdout) | |
717 self.assertIn(os.path.basename(fpath2), stdout) | |
718 self.assertNumLines(stdout, 2) | |
719 _Check1() | |
720 | |
721 def test_cross_storage_class_cloud_cp(self): | |
722 bucket1_uri = self.CreateBucket(storage_class='STANDARD') | |
723 bucket2_uri = self.CreateBucket( | |
724 storage_class='DURABLE_REDUCED_AVAILABILITY') | |
725 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') | |
726 # Server now allows copy-in-the-cloud across storage classes. | |
727 self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)]) | |
728 | |
729 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') | |
730 def test_cross_provider_cp(self): | |
731 s3_bucket = self.CreateBucket(provider='s3') | |
732 gs_bucket = self.CreateBucket(provider='gs') | |
733 s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='foo') | |
734 gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='bar') | |
735 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) | |
736 self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)]) | |
737 | |
738 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') | |
739 @unittest.skip('This test performs a large copy but remains here for ' | |
740 'debugging purposes.') | |
741 def test_cross_provider_large_cp(self): | |
742 s3_bucket = self.CreateBucket(provider='s3') | |
743 gs_bucket = self.CreateBucket(provider='gs') | |
744 s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='f'*1024*1024) | |
745 gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='b'*1024*1024) | |
746 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) | |
747 self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)]) | |
748 with SetBotoConfigForTest([ | |
749 ('GSUtil', 'resumable_threshold', str(ONE_KIB)), | |
750 ('GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))]): | |
751 # Ensure copy also works across json upload chunk boundaries. | |
752 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) | |
753 | |
754 @unittest.skip('This test is slow due to creating many objects, ' | |
755 'but remains here for debugging purposes.') | |
756 def test_daisy_chain_cp_file_sizes(self): | |
757 """Ensure daisy chain cp works with a wide of file sizes.""" | |
758 bucket_uri = self.CreateBucket() | |
759 bucket2_uri = self.CreateBucket() | |
760 exponent_cap = 28 # Up to 256 MiB in size. | |
761 for i in range(exponent_cap): | |
762 one_byte_smaller = 2**i - 1 | |
763 normal = 2**i | |
764 one_byte_larger = 2**i + 1 | |
765 self.CreateObject(bucket_uri=bucket_uri, contents='a'*one_byte_smaller) | |
766 self.CreateObject(bucket_uri=bucket_uri, contents='b'*normal) | |
767 self.CreateObject(bucket_uri=bucket_uri, contents='c'*one_byte_larger) | |
768 | |
769 self.AssertNObjectsInBucket(bucket_uri, exponent_cap*3) | |
770 self.RunGsUtil(['-m', 'cp', '-D', suri(bucket_uri, '**'), | |
771 suri(bucket2_uri)]) | |
772 | |
773 self.AssertNObjectsInBucket(bucket2_uri, exponent_cap*3) | |
774 | |
775 def test_daisy_chain_cp(self): | |
776 """Tests cp with the -D option.""" | |
777 bucket1_uri = self.CreateBucket(storage_class='STANDARD') | |
778 bucket2_uri = self.CreateBucket( | |
779 storage_class='DURABLE_REDUCED_AVAILABILITY') | |
780 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') | |
781 # Set some headers on source object so we can verify that headers are | |
782 # presereved by daisy-chain copy. | |
783 self.RunGsUtil(['setmeta', '-h', 'Cache-Control:public,max-age=12', | |
784 '-h', 'Content-Type:image/gif', | |
785 '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta, | |
786 suri(key_uri)]) | |
787 # Set public-read (non-default) ACL so we can verify that cp -D -p works. | |
788 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) | |
789 acl_json = self.RunGsUtil(['acl', 'get', suri(key_uri)], return_stdout=True) | |
790 # Perform daisy-chain copy and verify that source object headers and ACL | |
791 # were preserved. Also specify -n option to test that gsutil correctly | |
792 # removes the x-goog-if-generation-match:0 header that was set at uploading | |
793 # time when updating the ACL. | |
794 stderr = self.RunGsUtil(['cp', '-Dpn', suri(key_uri), suri(bucket2_uri)], | |
795 return_stderr=True) | |
796 self.assertNotIn('Copy-in-the-cloud disallowed', stderr) | |
797 | |
798 @Retry(AssertionError, tries=3, timeout_secs=1) | |
799 def _Check(): | |
800 uri = suri(bucket2_uri, key_uri.object_name) | |
801 stdout = self.RunGsUtil(['ls', '-L', uri], return_stdout=True) | |
802 self.assertRegexpMatches(stdout, r'Cache-Control:\s+public,max-age=12') | |
803 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') | |
804 self.assertRegexpMatches(stdout, r'Metadata:\s+1:\s+abcd') | |
805 new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True) | |
806 self.assertEqual(acl_json, new_acl_json) | |
807 _Check() | |
808 | |
809 def test_daisy_chain_cp_download_failure(self): | |
810 """Tests cp with the -D option when the download thread dies.""" | |
811 bucket1_uri = self.CreateBucket() | |
812 bucket2_uri = self.CreateBucket() | |
813 key_uri = self.CreateObject(bucket_uri=bucket1_uri, | |
814 contents='a' * self.halt_size) | |
815 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
816 test_callback_file = self.CreateTempFile( | |
817 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) | |
818 with SetBotoConfigForTest([boto_config_for_test]): | |
819 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, | |
820 '-D', suri(key_uri), suri(bucket2_uri)], | |
821 expected_status=1, return_stderr=True) | |
822 # Should have two exception traces; one from the download thread and | |
823 # one from the upload thread. | |
824 self.assertEqual(stderr.count( | |
825 'ResumableDownloadException: Artifically halting download'), 2) | |
826 | |
827 def test_canned_acl_cp(self): | |
828 """Tests copying with a canned ACL.""" | |
829 bucket1_uri = self.CreateBucket() | |
830 bucket2_uri = self.CreateBucket() | |
831 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') | |
832 self.RunGsUtil(['cp', '-a', 'public-read', suri(key_uri), | |
833 suri(bucket2_uri)]) | |
834 # Set public-read on the original key after the copy so we can compare | |
835 # the ACLs. | |
836 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) | |
837 public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)], | |
838 return_stdout=True) | |
839 | |
840 @Retry(AssertionError, tries=3, timeout_secs=1) | |
841 def _Check(): | |
842 uri = suri(bucket2_uri, key_uri.object_name) | |
843 new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True) | |
844 self.assertEqual(public_read_acl, new_acl_json) | |
845 _Check() | |
846 | |
847 @PerformsFileToObjectUpload | |
848 def test_canned_acl_upload(self): | |
849 """Tests uploading a file with a canned ACL.""" | |
850 bucket1_uri = self.CreateBucket() | |
851 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') | |
852 # Set public-read on the object so we can compare the ACLs. | |
853 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) | |
854 public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)], | |
855 return_stdout=True) | |
856 | |
857 file_name = 'bar' | |
858 fpath = self.CreateTempFile(file_name=file_name, contents='foo') | |
859 self.RunGsUtil(['cp', '-a', 'public-read', fpath, suri(bucket1_uri)]) | |
860 new_acl_json = self.RunGsUtil(['acl', 'get', suri(bucket1_uri, file_name)], | |
861 return_stdout=True) | |
862 self.assertEqual(public_read_acl, new_acl_json) | |
863 | |
864 resumable_size = ONE_KIB | |
865 boto_config_for_test = ('GSUtil', 'resumable_threshold', | |
866 str(resumable_size)) | |
867 with SetBotoConfigForTest([boto_config_for_test]): | |
868 resumable_file_name = 'resumable_bar' | |
869 resumable_contents = os.urandom(resumable_size) | |
870 resumable_fpath = self.CreateTempFile( | |
871 file_name=resumable_file_name, contents=resumable_contents) | |
872 self.RunGsUtil(['cp', '-a', 'public-read', resumable_fpath, | |
873 suri(bucket1_uri)]) | |
874 new_resumable_acl_json = self.RunGsUtil( | |
875 ['acl', 'get', suri(bucket1_uri, resumable_file_name)], | |
876 return_stdout=True) | |
877 self.assertEqual(public_read_acl, new_resumable_acl_json) | |
878 | |
879 def test_cp_key_to_local_stream(self): | |
880 bucket_uri = self.CreateBucket() | |
881 contents = 'foo' | |
882 key_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents) | |
883 stdout = self.RunGsUtil(['cp', suri(key_uri), '-'], return_stdout=True) | |
884 self.assertIn(contents, stdout) | |
885 | |
886 def test_cp_local_file_to_local_stream(self): | |
887 contents = 'content' | |
888 fpath = self.CreateTempFile(contents=contents) | |
889 stdout = self.RunGsUtil(['cp', fpath, '-'], return_stdout=True) | |
890 self.assertIn(contents, stdout) | |
891 | |
892 @PerformsFileToObjectUpload | |
893 def test_cp_zero_byte_file(self): | |
894 dst_bucket_uri = self.CreateBucket() | |
895 src_dir = self.CreateTempDir() | |
896 fpath = os.path.join(src_dir, 'zero_byte') | |
897 with open(fpath, 'w') as unused_out_file: | |
898 pass # Write a zero byte file | |
899 self.RunGsUtil(['cp', fpath, suri(dst_bucket_uri)]) | |
900 | |
901 @Retry(AssertionError, tries=3, timeout_secs=1) | |
902 def _Check1(): | |
903 stdout = self.RunGsUtil(['ls', suri(dst_bucket_uri)], return_stdout=True) | |
904 self.assertIn(os.path.basename(fpath), stdout) | |
905 _Check1() | |
906 | |
907 download_path = os.path.join(src_dir, 'zero_byte_download') | |
908 self.RunGsUtil(['cp', suri(dst_bucket_uri, 'zero_byte'), download_path]) | |
909 self.assertTrue(os.stat(download_path)) | |
910 | |
911 def test_copy_bucket_to_bucket(self): | |
912 """Tests that recursively copying from bucket to bucket. | |
913 | |
914 This should produce identically named objects (and not, in particular, | |
915 destination objects named by the version-specific URI from source objects). | |
916 """ | |
917 src_bucket_uri = self.CreateVersionedBucket() | |
918 dst_bucket_uri = self.CreateVersionedBucket() | |
919 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0', | |
920 contents='abc') | |
921 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1', | |
922 contents='def') | |
923 | |
924 # Use @Retry as hedge against bucket listing eventual consistency. | |
925 @Retry(AssertionError, tries=3, timeout_secs=1) | |
926 def _CopyAndCheck(): | |
927 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), | |
928 suri(dst_bucket_uri)]) | |
929 stdout = self.RunGsUtil(['ls', '-R', dst_bucket_uri.uri], | |
930 return_stdout=True) | |
931 self.assertIn('%s%s/obj0\n' % (dst_bucket_uri, | |
932 src_bucket_uri.bucket_name), stdout) | |
933 self.assertIn('%s%s/obj1\n' % (dst_bucket_uri, | |
934 src_bucket_uri.bucket_name), stdout) | |
935 _CopyAndCheck() | |
936 | |
937 def test_copy_bucket_to_dir(self): | |
938 """Tests recursively copying from bucket to a directory. | |
939 | |
940 This should produce identically named objects (and not, in particular, | |
941 destination objects named by the version- specific URI from source objects). | |
942 """ | |
943 src_bucket_uri = self.CreateBucket() | |
944 dst_dir = self.CreateTempDir() | |
945 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0', | |
946 contents='abc') | |
947 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1', | |
948 contents='def') | |
949 | |
950 # Use @Retry as hedge against bucket listing eventual consistency. | |
951 @Retry(AssertionError, tries=3, timeout_secs=1) | |
952 def _CopyAndCheck(): | |
953 """Copies the bucket recursively and validates the results.""" | |
954 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir]) | |
955 dir_list = [] | |
956 for dirname, _, filenames in os.walk(dst_dir): | |
957 for filename in filenames: | |
958 dir_list.append(os.path.join(dirname, filename)) | |
959 dir_list = sorted(dir_list) | |
960 self.assertEqual(len(dir_list), 2) | |
961 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, | |
962 'obj0'), dir_list[0]) | |
963 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, | |
964 'obj1'), dir_list[1]) | |
965 _CopyAndCheck() | |
966 | |
967 def test_recursive_download_with_leftover_dir_placeholder(self): | |
968 """Tests that we correctly handle leftover dir placeholders.""" | |
969 src_bucket_uri = self.CreateBucket() | |
970 dst_dir = self.CreateTempDir() | |
971 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0', | |
972 contents='abc') | |
973 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1', | |
974 contents='def') | |
975 | |
976 # Create a placeholder like what can be left over by web GUI tools. | |
977 key_uri = src_bucket_uri.clone_replace_name('/') | |
978 key_uri.set_contents_from_string('') | |
979 self.AssertNObjectsInBucket(src_bucket_uri, 3) | |
980 | |
981 stderr = self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir], | |
982 return_stderr=True) | |
983 self.assertIn('Skipping cloud sub-directory placeholder object', stderr) | |
984 dir_list = [] | |
985 for dirname, _, filenames in os.walk(dst_dir): | |
986 for filename in filenames: | |
987 dir_list.append(os.path.join(dirname, filename)) | |
988 dir_list = sorted(dir_list) | |
989 self.assertEqual(len(dir_list), 2) | |
990 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, | |
991 'obj0'), dir_list[0]) | |
992 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, | |
993 'obj1'), dir_list[1]) | |
994 | |
995 def test_copy_quiet(self): | |
996 bucket_uri = self.CreateBucket() | |
997 key_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') | |
998 stderr = self.RunGsUtil(['-q', 'cp', suri(key_uri), | |
999 suri(bucket_uri.clone_replace_name('o2'))], | |
1000 return_stderr=True) | |
1001 self.assertEqual(stderr.count('Copying '), 0) | |
1002 | |
1003 def test_cp_md5_match(self): | |
1004 """Tests that the uploaded object has the expected MD5. | |
1005 | |
1006 Note that while this does perform a file to object upload, MD5's are | |
1007 not supported for composite objects so we don't use the decorator in this | |
1008 case. | |
1009 """ | |
1010 bucket_uri = self.CreateBucket() | |
1011 fpath = self.CreateTempFile(contents='bar') | |
1012 with open(fpath, 'r') as f_in: | |
1013 file_md5 = base64.encodestring(binascii.unhexlify( | |
1014 CalculateMd5FromContents(f_in))).rstrip('\n') | |
1015 self.RunGsUtil(['cp', fpath, suri(bucket_uri)]) | |
1016 | |
1017 # Use @Retry as hedge against bucket listing eventual consistency. | |
1018 @Retry(AssertionError, tries=3, timeout_secs=1) | |
1019 def _Check1(): | |
1020 stdout = self.RunGsUtil(['ls', '-L', suri(bucket_uri)], | |
1021 return_stdout=True) | |
1022 self.assertRegexpMatches(stdout, | |
1023 r'Hash\s+\(md5\):\s+%s' % re.escape(file_md5)) | |
1024 _Check1() | |
1025 | |
1026 @unittest.skipIf(IS_WINDOWS, | |
1027 'Unicode handling on Windows requires mods to site-packages') | |
1028 @PerformsFileToObjectUpload | |
1029 def test_cp_manifest_upload_unicode(self): | |
1030 return self._ManifestUpload('foo-unicöde', 'bar-unicöde', | |
1031 'manifest-unicöde') | |
1032 | |
1033 @PerformsFileToObjectUpload | |
1034 def test_cp_manifest_upload(self): | |
1035 """Tests uploading with a mnifest file.""" | |
1036 return self._ManifestUpload('foo', 'bar', 'manifest') | |
1037 | |
1038 def _ManifestUpload(self, file_name, object_name, manifest_name): | |
1039 """Tests uploading with a manifest file.""" | |
1040 bucket_uri = self.CreateBucket() | |
1041 dsturi = suri(bucket_uri, object_name) | |
1042 | |
1043 fpath = self.CreateTempFile(file_name=file_name, contents='bar') | |
1044 logpath = self.CreateTempFile(file_name=manifest_name, contents='') | |
1045 # Ensure the file is empty. | |
1046 open(logpath, 'w').close() | |
1047 self.RunGsUtil(['cp', '-L', logpath, fpath, dsturi]) | |
1048 with open(logpath, 'r') as f: | |
1049 lines = f.readlines() | |
1050 self.assertEqual(len(lines), 2) | |
1051 | |
1052 expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5', | |
1053 'UploadId', 'Source Size', 'Bytes Transferred', | |
1054 'Result', 'Description'] | |
1055 self.assertEqual(expected_headers, lines[0].strip().split(',')) | |
1056 results = lines[1].strip().split(',') | |
1057 self.assertEqual(results[0][:7], 'file://') # source | |
1058 self.assertEqual(results[1][:5], '%s://' % | |
1059 self.default_provider) # destination | |
1060 date_format = '%Y-%m-%dT%H:%M:%S.%fZ' | |
1061 start_date = datetime.datetime.strptime(results[2], date_format) | |
1062 end_date = datetime.datetime.strptime(results[3], date_format) | |
1063 self.assertEqual(end_date > start_date, True) | |
1064 if self.RunGsUtil == testcase.GsUtilIntegrationTestCase.RunGsUtil: | |
1065 # Check that we didn't do automatic parallel uploads - compose doesn't | |
1066 # calculate the MD5 hash. Since RunGsUtil is overriden in | |
1067 # TestCpParallelUploads to force parallel uploads, we can check which | |
1068 # method was used. | |
1069 self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5 | |
1070 self.assertEqual(int(results[6]), 3) # Source Size | |
1071 self.assertEqual(int(results[7]), 3) # Bytes Transferred | |
1072 self.assertEqual(results[8], 'OK') # Result | |
1073 | |
1074 @PerformsFileToObjectUpload | |
1075 def test_cp_manifest_download(self): | |
1076 """Tests downloading with a manifest file.""" | |
1077 key_uri = self.CreateObject(contents='foo') | |
1078 fpath = self.CreateTempFile(contents='') | |
1079 logpath = self.CreateTempFile(contents='') | |
1080 # Ensure the file is empty. | |
1081 open(logpath, 'w').close() | |
1082 self.RunGsUtil(['cp', '-L', logpath, suri(key_uri), fpath], | |
1083 return_stdout=True) | |
1084 with open(logpath, 'r') as f: | |
1085 lines = f.readlines() | |
1086 self.assertEqual(len(lines), 2) | |
1087 | |
1088 expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5', | |
1089 'UploadId', 'Source Size', 'Bytes Transferred', | |
1090 'Result', 'Description'] | |
1091 self.assertEqual(expected_headers, lines[0].strip().split(',')) | |
1092 results = lines[1].strip().split(',') | |
1093 self.assertEqual(results[0][:5], '%s://' % | |
1094 self.default_provider) # source | |
1095 self.assertEqual(results[1][:7], 'file://') # destination | |
1096 date_format = '%Y-%m-%dT%H:%M:%S.%fZ' | |
1097 start_date = datetime.datetime.strptime(results[2], date_format) | |
1098 end_date = datetime.datetime.strptime(results[3], date_format) | |
1099 self.assertEqual(end_date > start_date, True) | |
1100 self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5 | |
1101 self.assertEqual(int(results[6]), 3) # Source Size | |
1102 # Bytes transferred might be more than 3 if the file was gzipped, since | |
1103 # the minimum gzip header is 10 bytes. | |
1104 self.assertGreaterEqual(int(results[7]), 3) # Bytes Transferred | |
1105 self.assertEqual(results[8], 'OK') # Result | |
1106 | |
1107 @PerformsFileToObjectUpload | |
1108 def test_copy_unicode_non_ascii_filename(self): | |
1109 key_uri = self.CreateObject(contents='foo') | |
1110 # Make file large enough to cause a resumable upload (which hashes filename | |
1111 # to construct tracker filename). | |
1112 fpath = self.CreateTempFile(file_name=u'Аудиоархив', | |
1113 contents='x' * 3 * 1024 * 1024) | |
1114 fpath_bytes = fpath.encode(UTF8) | |
1115 stderr = self.RunGsUtil(['cp', fpath_bytes, suri(key_uri)], | |
1116 return_stderr=True) | |
1117 self.assertIn('Copying file:', stderr) | |
1118 | |
1119 # Note: We originally one time implemented a test | |
1120 # (test_copy_invalid_unicode_filename) that invalid unicode filenames were | |
1121 # skipped, but it turns out os.walk() on MacOS doesn't have problems with | |
1122 # such files (so, failed that test). Given that, we decided to remove the | |
1123 # test. | |
1124 | |
1125 def test_gzip_upload_and_download(self): | |
1126 bucket_uri = self.CreateBucket() | |
1127 contents = 'x' * 10000 | |
1128 tmpdir = self.CreateTempDir() | |
1129 self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents) | |
1130 self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents) | |
1131 self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents) | |
1132 # Test that copying specifying only 2 of the 3 prefixes gzips the correct | |
1133 # files, and test that including whitespace in the extension list works. | |
1134 self.RunGsUtil(['cp', '-z', 'js, html', | |
1135 os.path.join(tmpdir, 'test.*'), suri(bucket_uri)]) | |
1136 self.AssertNObjectsInBucket(bucket_uri, 3) | |
1137 uri1 = suri(bucket_uri, 'test.html') | |
1138 uri2 = suri(bucket_uri, 'test.js') | |
1139 uri3 = suri(bucket_uri, 'test.txt') | |
1140 stdout = self.RunGsUtil(['stat', uri1], return_stdout=True) | |
1141 self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip') | |
1142 stdout = self.RunGsUtil(['stat', uri2], return_stdout=True) | |
1143 self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip') | |
1144 stdout = self.RunGsUtil(['stat', uri3], return_stdout=True) | |
1145 self.assertNotRegexpMatches(stdout, r'Content-Encoding:\s+gzip') | |
1146 fpath4 = self.CreateTempFile() | |
1147 for uri in (uri1, uri2, uri3): | |
1148 self.RunGsUtil(['cp', uri, suri(fpath4)]) | |
1149 with open(fpath4, 'r') as f: | |
1150 self.assertEqual(f.read(), contents) | |
1151 | |
1152 def test_upload_with_subdir_and_unexpanded_wildcard(self): | |
1153 fpath1 = self.CreateTempFile(file_name=('tmp', 'x', 'y', 'z')) | |
1154 bucket_uri = self.CreateBucket() | |
1155 wildcard_uri = '%s*' % fpath1[:-5] | |
1156 stderr = self.RunGsUtil(['cp', '-R', wildcard_uri, suri(bucket_uri)], | |
1157 return_stderr=True) | |
1158 self.assertIn('Copying file:', stderr) | |
1159 self.AssertNObjectsInBucket(bucket_uri, 1) | |
1160 | |
1161 def test_cp_object_ending_with_slash(self): | |
1162 """Tests that cp works with object names ending with slash.""" | |
1163 tmpdir = self.CreateTempDir() | |
1164 bucket_uri = self.CreateBucket() | |
1165 self.CreateObject(bucket_uri=bucket_uri, | |
1166 object_name='abc/', | |
1167 contents='dir') | |
1168 self.CreateObject(bucket_uri=bucket_uri, | |
1169 object_name='abc/def', | |
1170 contents='def') | |
1171 self.AssertNObjectsInBucket(bucket_uri, 2) | |
1172 self.RunGsUtil(['cp', '-R', suri(bucket_uri), tmpdir]) | |
1173 # Check that files in the subdir got copied even though subdir object | |
1174 # download was skipped. | |
1175 with open(os.path.join(tmpdir, bucket_uri.bucket_name, 'abc', 'def')) as f: | |
1176 self.assertEquals('def', '\n'.join(f.readlines())) | |
1177 | |
1178 def test_cp_without_read_access(self): | |
1179 """Tests that cp fails without read access to the object.""" | |
1180 # TODO: With 401's triggering retries in apitools, this test will take | |
1181 # a long time. Ideally, make apitools accept a num_retries config for this | |
1182 # until we stop retrying the 401's. | |
1183 bucket_uri = self.CreateBucket() | |
1184 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') | |
1185 | |
1186 # Use @Retry as hedge against bucket listing eventual consistency. | |
1187 self.AssertNObjectsInBucket(bucket_uri, 1) | |
1188 | |
1189 with self.SetAnonymousBotoCreds(): | |
1190 stderr = self.RunGsUtil(['cp', suri(object_uri), 'foo'], | |
1191 return_stderr=True, expected_status=1) | |
1192 self.assertIn('AccessDenied', stderr) | |
1193 | |
1194 @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.') | |
1195 def test_cp_minus_e(self): | |
1196 fpath_dir = self.CreateTempDir() | |
1197 fpath1 = self.CreateTempFile(tmpdir=fpath_dir) | |
1198 fpath2 = os.path.join(fpath_dir, 'cp_minus_e') | |
1199 bucket_uri = self.CreateBucket() | |
1200 os.symlink(fpath1, fpath2) | |
1201 stderr = self.RunGsUtil( | |
1202 ['cp', '-e', '%s%s*' % (fpath_dir, os.path.sep), | |
1203 suri(bucket_uri, 'files')], | |
1204 return_stderr=True) | |
1205 self.assertIn('Copying file', stderr) | |
1206 self.assertIn('Skipping symbolic link file', stderr) | |
1207 | |
1208 def test_cp_multithreaded_wildcard(self): | |
1209 """Tests that cp -m works with a wildcard.""" | |
1210 num_test_files = 5 | |
1211 tmp_dir = self.CreateTempDir(test_files=num_test_files) | |
1212 bucket_uri = self.CreateBucket() | |
1213 wildcard_uri = '%s%s*' % (tmp_dir, os.sep) | |
1214 self.RunGsUtil(['-m', 'cp', wildcard_uri, suri(bucket_uri)]) | |
1215 self.AssertNObjectsInBucket(bucket_uri, num_test_files) | |
1216 | |
1217 def test_cp_duplicate_source_args(self): | |
1218 """Tests that cp -m works when a source argument is provided twice.""" | |
1219 object_contents = 'edge' | |
1220 object_uri = self.CreateObject(object_name='foo', contents=object_contents) | |
1221 tmp_dir = self.CreateTempDir() | |
1222 self.RunGsUtil(['-m', 'cp', suri(object_uri), suri(object_uri), tmp_dir]) | |
1223 with open(os.path.join(tmp_dir, 'foo'), 'r') as in_fp: | |
1224 contents = in_fp.read() | |
1225 # Contents should be not duplicated. | |
1226 self.assertEqual(contents, object_contents) | |
1227 | |
1228 @SkipForS3('No resumable upload support for S3.') | |
1229 def test_cp_resumable_upload_break(self): | |
1230 """Tests that an upload can be resumed after a connection break.""" | |
1231 bucket_uri = self.CreateBucket() | |
1232 fpath = self.CreateTempFile(contents='a' * self.halt_size) | |
1233 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1234 test_callback_file = self.CreateTempFile( | |
1235 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5))) | |
1236 | |
1237 with SetBotoConfigForTest([boto_config_for_test]): | |
1238 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, | |
1239 fpath, suri(bucket_uri)], | |
1240 expected_status=1, return_stderr=True) | |
1241 self.assertIn('Artifically halting upload', stderr) | |
1242 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], | |
1243 return_stderr=True) | |
1244 self.assertIn('Resuming upload', stderr) | |
1245 | |
1246 @SkipForS3('No resumable upload support for S3.') | |
1247 def test_cp_resumable_upload_retry(self): | |
1248 """Tests that a resumable upload completes with one retry.""" | |
1249 bucket_uri = self.CreateBucket() | |
1250 fpath = self.CreateTempFile(contents='a' * self.halt_size) | |
1251 # TODO: Raising an httplib or socket error blocks bucket teardown | |
1252 # in JSON for 60-120s on a multiprocessing lock acquire. Figure out why; | |
1253 # until then, raise an apitools retryable exception. | |
1254 if self.test_api == ApiSelector.XML: | |
1255 test_callback_file = self.CreateTempFile( | |
1256 contents=pickle.dumps(_ResumableUploadRetryHandler( | |
1257 5, httplib.BadStatusLine, ('unused',)))) | |
1258 else: | |
1259 test_callback_file = self.CreateTempFile( | |
1260 contents=pickle.dumps(_ResumableUploadRetryHandler( | |
1261 5, apitools_exceptions.BadStatusCodeError, | |
1262 ('unused', 'unused', 'unused')))) | |
1263 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1264 with SetBotoConfigForTest([boto_config_for_test]): | |
1265 stderr = self.RunGsUtil(['-D', 'cp', '--testcallbackfile', | |
1266 test_callback_file, fpath, suri(bucket_uri)], | |
1267 return_stderr=1) | |
1268 if self.test_api == ApiSelector.XML: | |
1269 self.assertIn('Got retryable failure', stderr) | |
1270 else: | |
1271 self.assertIn('Retrying', stderr) | |
1272 | |
1273 @SkipForS3('No resumable upload support for S3.') | |
1274 def test_cp_resumable_streaming_upload_retry(self): | |
1275 """Tests that a streaming resumable upload completes with one retry.""" | |
1276 if self.test_api == ApiSelector.XML: | |
1277 return unittest.skip('XML does not support resumable streaming uploads.') | |
1278 bucket_uri = self.CreateBucket() | |
1279 | |
1280 test_callback_file = self.CreateTempFile( | |
1281 contents=pickle.dumps(_ResumableUploadRetryHandler( | |
1282 5, apitools_exceptions.BadStatusCodeError, | |
1283 ('unused', 'unused', 'unused')))) | |
1284 # Need to reduce the JSON chunk size since streaming uploads buffer a | |
1285 # full chunk. | |
1286 boto_configs_for_test = [('GSUtil', 'json_resumable_chunk_size', | |
1287 str(256 * ONE_KIB)), | |
1288 ('Boto', 'num_retries', '2')] | |
1289 with SetBotoConfigForTest(boto_configs_for_test): | |
1290 stderr = self.RunGsUtil( | |
1291 ['-D', 'cp', '--testcallbackfile', test_callback_file, '-', | |
1292 suri(bucket_uri, 'foo')], | |
1293 stdin='a' * 512 * ONE_KIB, return_stderr=1) | |
1294 self.assertIn('Retrying', stderr) | |
1295 | |
1296 @SkipForS3('No resumable upload support for S3.') | |
1297 def test_cp_resumable_upload(self): | |
1298 """Tests that a basic resumable upload completes successfully.""" | |
1299 bucket_uri = self.CreateBucket() | |
1300 fpath = self.CreateTempFile(contents='a' * self.halt_size) | |
1301 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1302 with SetBotoConfigForTest([boto_config_for_test]): | |
1303 self.RunGsUtil(['cp', fpath, suri(bucket_uri)]) | |
1304 | |
1305 @SkipForS3('No resumable upload support for S3.') | |
1306 def test_resumable_upload_break_leaves_tracker(self): | |
1307 """Tests that a tracker file is created with a resumable upload.""" | |
1308 bucket_uri = self.CreateBucket() | |
1309 fpath = self.CreateTempFile(file_name='foo', | |
1310 contents='a' * self.halt_size) | |
1311 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1312 with SetBotoConfigForTest([boto_config_for_test]): | |
1313 tracker_filename = GetTrackerFilePath( | |
1314 StorageUrlFromString(suri(bucket_uri, 'foo')), | |
1315 TrackerFileType.UPLOAD, self.test_api) | |
1316 test_callback_file = self.CreateTempFile( | |
1317 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5))) | |
1318 try: | |
1319 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, | |
1320 fpath, suri(bucket_uri, 'foo')], | |
1321 expected_status=1, return_stderr=True) | |
1322 self.assertIn('Artifically halting upload', stderr) | |
1323 self.assertTrue(os.path.exists(tracker_filename), | |
1324 'Tracker file %s not present.' % tracker_filename) | |
1325 finally: | |
1326 if os.path.exists(tracker_filename): | |
1327 os.unlink(tracker_filename) | |
1328 | |
1329 @SkipForS3('No resumable upload support for S3.') | |
1330 def test_cp_resumable_upload_break_file_size_change(self): | |
1331 """Tests a resumable upload where the uploaded file changes size. | |
1332 | |
1333 This should fail when we read the tracker data. | |
1334 """ | |
1335 bucket_uri = self.CreateBucket() | |
1336 tmp_dir = self.CreateTempDir() | |
1337 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, | |
1338 contents='a' * self.halt_size) | |
1339 test_callback_file = self.CreateTempFile( | |
1340 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5))) | |
1341 | |
1342 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1343 with SetBotoConfigForTest([boto_config_for_test]): | |
1344 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, | |
1345 fpath, suri(bucket_uri)], | |
1346 expected_status=1, return_stderr=True) | |
1347 self.assertIn('Artifically halting upload', stderr) | |
1348 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, | |
1349 contents='a' * self.halt_size * 2) | |
1350 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], | |
1351 expected_status=1, return_stderr=True) | |
1352 self.assertIn('ResumableUploadAbortException', stderr) | |
1353 | |
1354 @SkipForS3('No resumable upload support for S3.') | |
1355 def test_cp_resumable_upload_break_file_content_change(self): | |
1356 """Tests a resumable upload where the uploaded file changes content.""" | |
1357 if self.test_api == ApiSelector.XML: | |
1358 return unittest.skip( | |
1359 'XML doesn\'t make separate HTTP calls at fixed-size boundaries for ' | |
1360 'resumable uploads, so we can\'t guarantee that the server saves a ' | |
1361 'specific part of the upload.') | |
1362 bucket_uri = self.CreateBucket() | |
1363 tmp_dir = self.CreateTempDir() | |
1364 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, | |
1365 contents='a' * ONE_KIB * 512) | |
1366 test_callback_file = self.CreateTempFile( | |
1367 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, | |
1368 int(ONE_KIB) * 384))) | |
1369 resumable_threshold_for_test = ( | |
1370 'GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1371 resumable_chunk_size_for_test = ( | |
1372 'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256)) | |
1373 with SetBotoConfigForTest([resumable_threshold_for_test, | |
1374 resumable_chunk_size_for_test]): | |
1375 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, | |
1376 fpath, suri(bucket_uri)], | |
1377 expected_status=1, return_stderr=True) | |
1378 self.assertIn('Artifically halting upload', stderr) | |
1379 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, | |
1380 contents='b' * ONE_KIB * 512) | |
1381 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], | |
1382 expected_status=1, return_stderr=True) | |
1383 self.assertIn('doesn\'t match cloud-supplied digest', stderr) | |
1384 | |
1385 @SkipForS3('No resumable upload support for S3.') | |
1386 def test_cp_resumable_upload_break_file_smaller_size(self): | |
1387 """Tests a resumable upload where the uploaded file changes content. | |
1388 | |
1389 This should fail hash validation. | |
1390 """ | |
1391 bucket_uri = self.CreateBucket() | |
1392 tmp_dir = self.CreateTempDir() | |
1393 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, | |
1394 contents='a' * ONE_KIB * 512) | |
1395 test_callback_file = self.CreateTempFile( | |
1396 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, | |
1397 int(ONE_KIB) * 384))) | |
1398 resumable_threshold_for_test = ( | |
1399 'GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1400 resumable_chunk_size_for_test = ( | |
1401 'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256)) | |
1402 with SetBotoConfigForTest([resumable_threshold_for_test, | |
1403 resumable_chunk_size_for_test]): | |
1404 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, | |
1405 fpath, suri(bucket_uri)], | |
1406 expected_status=1, return_stderr=True) | |
1407 self.assertIn('Artifically halting upload', stderr) | |
1408 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, | |
1409 contents='a' * ONE_KIB) | |
1410 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], | |
1411 expected_status=1, return_stderr=True) | |
1412 self.assertIn('ResumableUploadAbortException', stderr) | |
1413 | |
1414 # This temporarily changes the tracker directory to unwritable which | |
1415 # interferes with any parallel running tests that use the tracker directory. | |
1416 @NotParallelizable | |
1417 @SkipForS3('No resumable upload support for S3.') | |
1418 @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.') | |
1419 @PerformsFileToObjectUpload | |
1420 def test_cp_unwritable_tracker_file(self): | |
1421 """Tests a resumable upload with an unwritable tracker file.""" | |
1422 bucket_uri = self.CreateBucket() | |
1423 tracker_filename = GetTrackerFilePath( | |
1424 StorageUrlFromString(suri(bucket_uri, 'foo')), | |
1425 TrackerFileType.UPLOAD, self.test_api) | |
1426 tracker_dir = os.path.dirname(tracker_filename) | |
1427 fpath = self.CreateTempFile(file_name='foo', contents='a' * ONE_KIB) | |
1428 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1429 save_mod = os.stat(tracker_dir).st_mode | |
1430 | |
1431 try: | |
1432 os.chmod(tracker_dir, 0) | |
1433 with SetBotoConfigForTest([boto_config_for_test]): | |
1434 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], | |
1435 expected_status=1, return_stderr=True) | |
1436 self.assertIn('Couldn\'t write tracker file', stderr) | |
1437 finally: | |
1438 os.chmod(tracker_dir, save_mod) | |
1439 if os.path.exists(tracker_filename): | |
1440 os.unlink(tracker_filename) | |
1441 | |
1442 # This temporarily changes the tracker directory to unwritable which | |
1443 # interferes with any parallel running tests that use the tracker directory. | |
1444 @NotParallelizable | |
1445 @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.') | |
1446 def test_cp_unwritable_tracker_file_download(self): | |
1447 """Tests downloads with an unwritable tracker file.""" | |
1448 object_uri = self.CreateObject(contents='foo' * ONE_KIB) | |
1449 tracker_filename = GetTrackerFilePath( | |
1450 StorageUrlFromString(suri(object_uri)), | |
1451 TrackerFileType.DOWNLOAD, self.test_api) | |
1452 tracker_dir = os.path.dirname(tracker_filename) | |
1453 fpath = self.CreateTempFile() | |
1454 save_mod = os.stat(tracker_dir).st_mode | |
1455 | |
1456 try: | |
1457 os.chmod(tracker_dir, 0) | |
1458 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(EIGHT_MIB)) | |
1459 with SetBotoConfigForTest([boto_config_for_test]): | |
1460 # Should succeed because we are below the threshold. | |
1461 self.RunGsUtil(['cp', suri(object_uri), fpath]) | |
1462 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1463 with SetBotoConfigForTest([boto_config_for_test]): | |
1464 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], | |
1465 expected_status=1, return_stderr=True) | |
1466 self.assertIn('Couldn\'t write tracker file', stderr) | |
1467 finally: | |
1468 os.chmod(tracker_dir, save_mod) | |
1469 if os.path.exists(tracker_filename): | |
1470 os.unlink(tracker_filename) | |
1471 | |
1472 def test_cp_resumable_download_break(self): | |
1473 """Tests that a download can be resumed after a connection break.""" | |
1474 bucket_uri = self.CreateBucket() | |
1475 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1476 contents='a' * self.halt_size) | |
1477 fpath = self.CreateTempFile() | |
1478 test_callback_file = self.CreateTempFile( | |
1479 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) | |
1480 | |
1481 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1482 with SetBotoConfigForTest([boto_config_for_test]): | |
1483 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, | |
1484 suri(object_uri), fpath], | |
1485 expected_status=1, return_stderr=True) | |
1486 self.assertIn('Artifically halting download.', stderr) | |
1487 tracker_filename = GetTrackerFilePath( | |
1488 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) | |
1489 self.assertTrue(os.path.isfile(tracker_filename)) | |
1490 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], | |
1491 return_stderr=True) | |
1492 self.assertIn('Resuming download', stderr) | |
1493 | |
1494 def test_cp_resumable_download_etag_differs(self): | |
1495 """Tests that download restarts the file when the source object changes. | |
1496 | |
1497 This causes the etag not to match. | |
1498 """ | |
1499 bucket_uri = self.CreateBucket() | |
1500 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1501 contents='a' * self.halt_size) | |
1502 fpath = self.CreateTempFile() | |
1503 test_callback_file = self.CreateTempFile( | |
1504 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) | |
1505 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1506 with SetBotoConfigForTest([boto_config_for_test]): | |
1507 # This will create a tracker file with an ETag. | |
1508 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, | |
1509 suri(object_uri), fpath], | |
1510 expected_status=1, return_stderr=True) | |
1511 self.assertIn('Artifically halting download.', stderr) | |
1512 # Create a new object with different contents - it should have a | |
1513 # different ETag since the content has changed. | |
1514 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1515 contents='b' * self.halt_size) | |
1516 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], | |
1517 return_stderr=True) | |
1518 self.assertNotIn('Resuming download', stderr) | |
1519 | |
1520 def test_cp_resumable_download_file_larger(self): | |
1521 """Tests download deletes the tracker file when existing file is larger.""" | |
1522 bucket_uri = self.CreateBucket() | |
1523 fpath = self.CreateTempFile() | |
1524 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1525 contents='a' * self.halt_size) | |
1526 test_callback_file = self.CreateTempFile( | |
1527 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) | |
1528 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1529 with SetBotoConfigForTest([boto_config_for_test]): | |
1530 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, | |
1531 suri(object_uri), fpath], | |
1532 expected_status=1, return_stderr=True) | |
1533 self.assertIn('Artifically halting download.', stderr) | |
1534 with open(fpath, 'w') as larger_file: | |
1535 for _ in range(self.halt_size * 2): | |
1536 larger_file.write('a') | |
1537 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], | |
1538 expected_status=1, return_stderr=True) | |
1539 self.assertNotIn('Resuming download', stderr) | |
1540 self.assertIn('is larger', stderr) | |
1541 self.assertIn('Deleting tracker file', stderr) | |
1542 | |
1543 def test_cp_resumable_download_content_differs(self): | |
1544 """Tests that we do not re-download when tracker file matches existing file. | |
1545 | |
1546 We only compare size, not contents, so re-download should not occur even | |
1547 though the contents are technically different. However, hash validation on | |
1548 the file should still occur and we will delete the file then because | |
1549 the hashes differ. | |
1550 """ | |
1551 bucket_uri = self.CreateBucket() | |
1552 tmp_dir = self.CreateTempDir() | |
1553 fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB) | |
1554 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1555 contents='efgh' * ONE_KIB) | |
1556 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) | |
1557 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) | |
1558 self.assertIsNotNone(etag_match, 'Could not get object ETag') | |
1559 self.assertEqual(len(etag_match.groups()), 1, | |
1560 'Did not match expected single ETag') | |
1561 etag = etag_match.group(1) | |
1562 | |
1563 tracker_filename = GetTrackerFilePath( | |
1564 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) | |
1565 try: | |
1566 with open(tracker_filename, 'w') as tracker_fp: | |
1567 tracker_fp.write(etag) | |
1568 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1569 with SetBotoConfigForTest([boto_config_for_test]): | |
1570 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], | |
1571 return_stderr=True, expected_status=1) | |
1572 self.assertIn('Download already complete for file', stderr) | |
1573 self.assertIn('doesn\'t match cloud-supplied digest', stderr) | |
1574 # File and tracker file should be deleted. | |
1575 self.assertFalse(os.path.isfile(fpath)) | |
1576 self.assertFalse(os.path.isfile(tracker_filename)) | |
1577 finally: | |
1578 if os.path.exists(tracker_filename): | |
1579 os.unlink(tracker_filename) | |
1580 | |
1581 def test_cp_resumable_download_content_matches(self): | |
1582 """Tests download no-ops when tracker file matches existing file.""" | |
1583 bucket_uri = self.CreateBucket() | |
1584 tmp_dir = self.CreateTempDir() | |
1585 matching_contents = 'abcd' * ONE_KIB | |
1586 fpath = self.CreateTempFile(tmpdir=tmp_dir, contents=matching_contents) | |
1587 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1588 contents=matching_contents) | |
1589 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) | |
1590 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) | |
1591 self.assertIsNotNone(etag_match, 'Could not get object ETag') | |
1592 self.assertEqual(len(etag_match.groups()), 1, | |
1593 'Did not match expected single ETag') | |
1594 etag = etag_match.group(1) | |
1595 tracker_filename = GetTrackerFilePath( | |
1596 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) | |
1597 with open(tracker_filename, 'w') as tracker_fp: | |
1598 tracker_fp.write(etag) | |
1599 try: | |
1600 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1601 with SetBotoConfigForTest([boto_config_for_test]): | |
1602 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], | |
1603 return_stderr=True) | |
1604 self.assertIn('Download already complete for file', stderr) | |
1605 # Tracker file should be removed after successful hash validation. | |
1606 self.assertFalse(os.path.isfile(tracker_filename)) | |
1607 finally: | |
1608 if os.path.exists(tracker_filename): | |
1609 os.unlink(tracker_filename) | |
1610 | |
1611 def test_cp_resumable_download_tracker_file_not_matches(self): | |
1612 """Tests that download overwrites when tracker file etag does not match.""" | |
1613 bucket_uri = self.CreateBucket() | |
1614 tmp_dir = self.CreateTempDir() | |
1615 fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB) | |
1616 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1617 contents='efgh' * ONE_KIB) | |
1618 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) | |
1619 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) | |
1620 self.assertIsNotNone(etag_match, 'Could not get object ETag') | |
1621 self.assertEqual(len(etag_match.groups()), 1, | |
1622 'Did not match regex for exactly one object ETag') | |
1623 etag = etag_match.group(1) | |
1624 etag += 'nonmatching' | |
1625 tracker_filename = GetTrackerFilePath( | |
1626 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) | |
1627 with open(tracker_filename, 'w') as tracker_fp: | |
1628 tracker_fp.write(etag) | |
1629 try: | |
1630 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1631 with SetBotoConfigForTest([boto_config_for_test]): | |
1632 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], | |
1633 return_stderr=True) | |
1634 self.assertNotIn('Resuming download', stderr) | |
1635 # Ensure the file was overwritten. | |
1636 with open(fpath, 'r') as in_fp: | |
1637 contents = in_fp.read() | |
1638 self.assertEqual(contents, 'efgh' * ONE_KIB, | |
1639 'File not overwritten when it should have been ' | |
1640 'due to a non-matching tracker file.') | |
1641 self.assertFalse(os.path.isfile(tracker_filename)) | |
1642 finally: | |
1643 if os.path.exists(tracker_filename): | |
1644 os.unlink(tracker_filename) | |
1645 | |
1646 def test_cp_resumable_download_gzip(self): | |
1647 """Tests that download can be resumed successfully with a gzipped file.""" | |
1648 # Generate some reasonably incompressible data. This compresses to a bit | |
1649 # around 128K in practice, but we assert specifically below that it is | |
1650 # larger than self.halt_size to guarantee that we can halt the download | |
1651 # partway through. | |
1652 object_uri = self.CreateObject() | |
1653 random.seed(0) | |
1654 contents = str([random.choice(string.ascii_letters) | |
1655 for _ in xrange(ONE_KIB * 128)]) | |
1656 random.seed() # Reset the seed for any other tests. | |
1657 fpath1 = self.CreateTempFile(file_name='unzipped.txt', contents=contents) | |
1658 self.RunGsUtil(['cp', '-z', 'txt', suri(fpath1), suri(object_uri)]) | |
1659 | |
1660 # Use @Retry as hedge against bucket listing eventual consistency. | |
1661 @Retry(AssertionError, tries=3, timeout_secs=1) | |
1662 def _GetObjectSize(): | |
1663 stdout = self.RunGsUtil(['du', suri(object_uri)], return_stdout=True) | |
1664 size_match = re.search(r'(\d+)\s+.*', stdout) | |
1665 self.assertIsNotNone(size_match, 'Could not get object size') | |
1666 self.assertEqual(len(size_match.groups()), 1, | |
1667 'Did not match regex for exactly one object size.') | |
1668 return long(size_match.group(1)) | |
1669 | |
1670 object_size = _GetObjectSize() | |
1671 self.assertGreaterEqual(object_size, self.halt_size, | |
1672 'Compresed object size was not large enough to ' | |
1673 'allow for a halted download, so the test results ' | |
1674 'would be invalid. Please increase the compressed ' | |
1675 'object size in the test.') | |
1676 fpath2 = self.CreateTempFile() | |
1677 test_callback_file = self.CreateTempFile( | |
1678 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) | |
1679 | |
1680 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1681 with SetBotoConfigForTest([boto_config_for_test]): | |
1682 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, | |
1683 suri(object_uri), suri(fpath2)], | |
1684 return_stderr=True, expected_status=1) | |
1685 self.assertIn('Artifically halting download.', stderr) | |
1686 tracker_filename = GetTrackerFilePath( | |
1687 StorageUrlFromString(fpath2), TrackerFileType.DOWNLOAD, self.test_api) | |
1688 self.assertTrue(os.path.isfile(tracker_filename)) | |
1689 self.assertIn('Downloading to temp gzip filename', stderr) | |
1690 # We should have a temporary gzipped file, a tracker file, and no | |
1691 # final file yet. | |
1692 self.assertTrue(os.path.isfile('%s_.gztmp' % fpath2)) | |
1693 stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)], | |
1694 return_stderr=True) | |
1695 self.assertIn('Resuming download', stderr) | |
1696 with open(fpath2, 'r') as f: | |
1697 self.assertEqual(f.read(), contents, 'File contents did not match.') | |
1698 self.assertFalse(os.path.isfile(tracker_filename)) | |
1699 self.assertFalse(os.path.isfile('%s_.gztmp' % fpath2)) | |
1700 | |
1701 @SkipForS3('No resumable upload support for S3.') | |
1702 def test_cp_resumable_upload_bucket_deleted(self): | |
1703 """Tests that a not found exception is raised if bucket no longer exists.""" | |
1704 bucket_uri = self.CreateBucket() | |
1705 fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB) | |
1706 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1707 test_callback_file = self.CreateTempFile( | |
1708 contents=pickle.dumps( | |
1709 _DeleteBucketThenStartOverCopyCallbackHandler(5, bucket_uri))) | |
1710 | |
1711 with SetBotoConfigForTest([boto_config_for_test]): | |
1712 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, | |
1713 fpath, suri(bucket_uri)], return_stderr=True, | |
1714 expected_status=1) | |
1715 self.assertIn('Deleting bucket', stderr) | |
1716 self.assertIn('bucket does not exist', stderr) | |
1717 | |
1718 @SkipForS3('No resumable upload support for S3.') | |
1719 def test_cp_resumable_upload_start_over_http_error(self): | |
1720 for start_over_error in (404, 410): | |
1721 self.start_over_error_test_helper(start_over_error) | |
1722 | |
1723 def start_over_error_test_helper(self, http_error_num): | |
1724 bucket_uri = self.CreateBucket() | |
1725 fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB) | |
1726 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) | |
1727 if self.test_api == ApiSelector.JSON: | |
1728 test_callback_file = self.CreateTempFile( | |
1729 contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404))) | |
1730 elif self.test_api == ApiSelector.XML: | |
1731 test_callback_file = self.CreateTempFile( | |
1732 contents=pickle.dumps( | |
1733 _XMLResumableUploadStartOverCopyCallbackHandler(5))) | |
1734 | |
1735 with SetBotoConfigForTest([boto_config_for_test]): | |
1736 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, | |
1737 fpath, suri(bucket_uri)], return_stderr=True) | |
1738 self.assertIn('Restarting upload from scratch', stderr) | |
1739 | |
1740 def test_cp_minus_c(self): | |
1741 bucket_uri = self.CreateBucket() | |
1742 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1743 contents='foo') | |
1744 self.RunGsUtil( | |
1745 ['cp', '-c', suri(bucket_uri) + '/foo2', suri(object_uri), | |
1746 suri(bucket_uri) + '/dir/'], | |
1747 expected_status=1) | |
1748 self.RunGsUtil(['stat', '%s/dir/foo' % suri(bucket_uri)]) | |
1749 | |
1750 def test_rewrite_cp(self): | |
1751 """Tests the JSON Rewrite API.""" | |
1752 if self.test_api == ApiSelector.XML: | |
1753 return unittest.skip('Rewrite API is only supported in JSON.') | |
1754 bucket_uri = self.CreateBucket() | |
1755 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1756 contents='bar') | |
1757 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), | |
1758 self.default_provider) | |
1759 key = object_uri.get_key() | |
1760 src_obj_metadata = apitools_messages.Object( | |
1761 name=key.name, bucket=key.bucket.name, contentType=key.content_type) | |
1762 dst_obj_metadata = apitools_messages.Object( | |
1763 bucket=src_obj_metadata.bucket, | |
1764 name=self.MakeTempName('object'), | |
1765 contentType=src_obj_metadata.contentType) | |
1766 gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata) | |
1767 self.assertEqual( | |
1768 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, | |
1769 src_obj_metadata.name, | |
1770 fields=['md5Hash']).md5Hash, | |
1771 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, | |
1772 dst_obj_metadata.name, | |
1773 fields=['md5Hash']).md5Hash, | |
1774 'Error: Rewritten object\'s hash doesn\'t match source object.') | |
1775 | |
1776 def test_rewrite_cp_resume(self): | |
1777 """Tests the JSON Rewrite API, breaking and resuming via a tracker file.""" | |
1778 if self.test_api == ApiSelector.XML: | |
1779 return unittest.skip('Rewrite API is only supported in JSON.') | |
1780 bucket_uri = self.CreateBucket() | |
1781 # Second bucket needs to be a different storage class so the service | |
1782 # actually rewrites the bytes. | |
1783 bucket_uri2 = self.CreateBucket( | |
1784 storage_class='DURABLE_REDUCED_AVAILABILITY') | |
1785 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we | |
1786 # need 2 response from the service: 1 success, 1 failure prior to | |
1787 # completion. | |
1788 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1789 contents=('12'*ONE_MIB) + 'bar', | |
1790 prefer_json_api=True) | |
1791 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), | |
1792 self.default_provider) | |
1793 key = object_uri.get_key() | |
1794 src_obj_metadata = apitools_messages.Object( | |
1795 name=key.name, bucket=key.bucket.name, contentType=key.content_type, | |
1796 etag=key.etag.strip('"\'')) | |
1797 dst_obj_name = self.MakeTempName('object') | |
1798 dst_obj_metadata = apitools_messages.Object( | |
1799 bucket=bucket_uri2.bucket_name, | |
1800 name=dst_obj_name, | |
1801 contentType=src_obj_metadata.contentType) | |
1802 tracker_file_name = GetRewriteTrackerFilePath( | |
1803 src_obj_metadata.bucket, src_obj_metadata.name, | |
1804 dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api) | |
1805 try: | |
1806 try: | |
1807 gsutil_api.CopyObject( | |
1808 src_obj_metadata, dst_obj_metadata, | |
1809 progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call, | |
1810 max_bytes_per_call=ONE_MIB) | |
1811 self.fail('Expected _RewriteHaltException.') | |
1812 except _RewriteHaltException: | |
1813 pass | |
1814 | |
1815 # Tracker file should be left over. | |
1816 self.assertTrue(os.path.exists(tracker_file_name)) | |
1817 | |
1818 # Now resume. Callback ensures we didn't start over. | |
1819 gsutil_api.CopyObject( | |
1820 src_obj_metadata, dst_obj_metadata, | |
1821 progress_callback=_EnsureRewriteResumeCallbackHandler(ONE_MIB*2).call, | |
1822 max_bytes_per_call=ONE_MIB) | |
1823 | |
1824 # Copy completed; tracker file should be deleted. | |
1825 self.assertFalse(os.path.exists(tracker_file_name)) | |
1826 | |
1827 self.assertEqual( | |
1828 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, | |
1829 src_obj_metadata.name, | |
1830 fields=['md5Hash']).md5Hash, | |
1831 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, | |
1832 dst_obj_metadata.name, | |
1833 fields=['md5Hash']).md5Hash, | |
1834 'Error: Rewritten object\'s hash doesn\'t match source object.') | |
1835 finally: | |
1836 # Clean up if something went wrong. | |
1837 DeleteTrackerFile(tracker_file_name) | |
1838 | |
1839 def test_rewrite_cp_resume_source_changed(self): | |
1840 """Tests that Rewrite starts over when the source object has changed.""" | |
1841 if self.test_api == ApiSelector.XML: | |
1842 return unittest.skip('Rewrite API is only supported in JSON.') | |
1843 bucket_uri = self.CreateBucket() | |
1844 # Second bucket needs to be a different storage class so the service | |
1845 # actually rewrites the bytes. | |
1846 bucket_uri2 = self.CreateBucket( | |
1847 storage_class='DURABLE_REDUCED_AVAILABILITY') | |
1848 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we | |
1849 # need 2 response from the service: 1 success, 1 failure prior to | |
1850 # completion. | |
1851 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1852 contents=('12'*ONE_MIB) + 'bar', | |
1853 prefer_json_api=True) | |
1854 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), | |
1855 self.default_provider) | |
1856 key = object_uri.get_key() | |
1857 src_obj_metadata = apitools_messages.Object( | |
1858 name=key.name, bucket=key.bucket.name, contentType=key.content_type, | |
1859 etag=key.etag.strip('"\'')) | |
1860 dst_obj_name = self.MakeTempName('object') | |
1861 dst_obj_metadata = apitools_messages.Object( | |
1862 bucket=bucket_uri2.bucket_name, | |
1863 name=dst_obj_name, | |
1864 contentType=src_obj_metadata.contentType) | |
1865 tracker_file_name = GetRewriteTrackerFilePath( | |
1866 src_obj_metadata.bucket, src_obj_metadata.name, | |
1867 dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api) | |
1868 try: | |
1869 try: | |
1870 gsutil_api.CopyObject( | |
1871 src_obj_metadata, dst_obj_metadata, | |
1872 progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call, | |
1873 max_bytes_per_call=ONE_MIB) | |
1874 self.fail('Expected _RewriteHaltException.') | |
1875 except _RewriteHaltException: | |
1876 pass | |
1877 # Overwrite the original object. | |
1878 object_uri2 = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1879 contents='bar', prefer_json_api=True) | |
1880 key2 = object_uri2.get_key() | |
1881 src_obj_metadata2 = apitools_messages.Object( | |
1882 name=key2.name, bucket=key2.bucket.name, | |
1883 contentType=key2.content_type, etag=key2.etag.strip('"\'')) | |
1884 | |
1885 # Tracker file for original object should still exist. | |
1886 self.assertTrue(os.path.exists(tracker_file_name)) | |
1887 | |
1888 # Copy the new object. | |
1889 gsutil_api.CopyObject(src_obj_metadata2, dst_obj_metadata, | |
1890 max_bytes_per_call=ONE_MIB) | |
1891 | |
1892 # Copy completed; original tracker file should be deleted. | |
1893 self.assertFalse(os.path.exists(tracker_file_name)) | |
1894 | |
1895 self.assertEqual( | |
1896 gsutil_api.GetObjectMetadata(src_obj_metadata2.bucket, | |
1897 src_obj_metadata2.name, | |
1898 fields=['md5Hash']).md5Hash, | |
1899 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, | |
1900 dst_obj_metadata.name, | |
1901 fields=['md5Hash']).md5Hash, | |
1902 'Error: Rewritten object\'s hash doesn\'t match source object.') | |
1903 finally: | |
1904 # Clean up if something went wrong. | |
1905 DeleteTrackerFile(tracker_file_name) | |
1906 | |
1907 def test_rewrite_cp_resume_command_changed(self): | |
1908 """Tests that Rewrite starts over when the arguments changed.""" | |
1909 if self.test_api == ApiSelector.XML: | |
1910 return unittest.skip('Rewrite API is only supported in JSON.') | |
1911 bucket_uri = self.CreateBucket() | |
1912 # Second bucket needs to be a different storage class so the service | |
1913 # actually rewrites the bytes. | |
1914 bucket_uri2 = self.CreateBucket( | |
1915 storage_class='DURABLE_REDUCED_AVAILABILITY') | |
1916 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we | |
1917 # need 2 response from the service: 1 success, 1 failure prior to | |
1918 # completion. | |
1919 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
1920 contents=('12'*ONE_MIB) + 'bar', | |
1921 prefer_json_api=True) | |
1922 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), | |
1923 self.default_provider) | |
1924 key = object_uri.get_key() | |
1925 src_obj_metadata = apitools_messages.Object( | |
1926 name=key.name, bucket=key.bucket.name, contentType=key.content_type, | |
1927 etag=key.etag.strip('"\'')) | |
1928 dst_obj_name = self.MakeTempName('object') | |
1929 dst_obj_metadata = apitools_messages.Object( | |
1930 bucket=bucket_uri2.bucket_name, | |
1931 name=dst_obj_name, | |
1932 contentType=src_obj_metadata.contentType) | |
1933 tracker_file_name = GetRewriteTrackerFilePath( | |
1934 src_obj_metadata.bucket, src_obj_metadata.name, | |
1935 dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api) | |
1936 try: | |
1937 try: | |
1938 gsutil_api.CopyObject( | |
1939 src_obj_metadata, dst_obj_metadata, canned_acl='private', | |
1940 progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call, | |
1941 max_bytes_per_call=ONE_MIB) | |
1942 self.fail('Expected _RewriteHaltException.') | |
1943 except _RewriteHaltException: | |
1944 pass | |
1945 | |
1946 # Tracker file for original object should still exist. | |
1947 self.assertTrue(os.path.exists(tracker_file_name)) | |
1948 | |
1949 # Copy the same object but with different call parameters. | |
1950 gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata, | |
1951 canned_acl='public-read', | |
1952 max_bytes_per_call=ONE_MIB) | |
1953 | |
1954 # Copy completed; original tracker file should be deleted. | |
1955 self.assertFalse(os.path.exists(tracker_file_name)) | |
1956 | |
1957 new_obj_metadata = gsutil_api.GetObjectMetadata( | |
1958 dst_obj_metadata.bucket, dst_obj_metadata.name, | |
1959 fields=['acl,md5Hash']) | |
1960 self.assertEqual( | |
1961 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, | |
1962 src_obj_metadata.name, | |
1963 fields=['md5Hash']).md5Hash, | |
1964 new_obj_metadata.md5Hash, | |
1965 'Error: Rewritten object\'s hash doesn\'t match source object.') | |
1966 # New object should have a public-read ACL from the second command. | |
1967 found_public_acl = False | |
1968 for acl_entry in new_obj_metadata.acl: | |
1969 if acl_entry.entity == 'allUsers': | |
1970 found_public_acl = True | |
1971 self.assertTrue(found_public_acl, | |
1972 'New object was not written with a public ACL.') | |
1973 finally: | |
1974 # Clean up if something went wrong. | |
1975 DeleteTrackerFile(tracker_file_name) | |
1976 | |
1977 | |
1978 class TestCpUnitTests(testcase.GsUtilUnitTestCase): | |
1979 """Unit tests for gsutil cp.""" | |
1980 | |
1981 def testDownloadWithNoHashAvailable(self): | |
1982 """Tests a download with no valid server-supplied hash.""" | |
1983 # S3 should have a special message for non-MD5 etags. | |
1984 bucket_uri = self.CreateBucket(provider='s3') | |
1985 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') | |
1986 object_uri.get_key().etag = '12345' # Not an MD5 | |
1987 dst_dir = self.CreateTempDir() | |
1988 | |
1989 log_handler = self.RunCommand( | |
1990 'cp', [suri(object_uri), dst_dir], return_log_handler=True) | |
1991 warning_messages = log_handler.messages['warning'] | |
1992 self.assertEquals(2, len(warning_messages)) | |
1993 self.assertRegexpMatches( | |
1994 warning_messages[0], | |
1995 r'Non-MD5 etag \(12345\) present for key .*, ' | |
1996 r'data integrity checks are not possible') | |
1997 self.assertIn('Integrity cannot be assured', warning_messages[1]) | |
1998 | |
1999 def test_object_and_prefix_same_name(self): | |
2000 bucket_uri = self.CreateBucket() | |
2001 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', | |
2002 contents='foo') | |
2003 self.CreateObject(bucket_uri=bucket_uri, | |
2004 object_name='foo/bar', contents='bar') | |
2005 fpath = self.CreateTempFile() | |
2006 # MockKey doesn't support hash_algs, so the MD5 will not match. | |
2007 with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]): | |
2008 self.RunCommand('cp', [suri(object_uri), fpath]) | |
2009 with open(fpath, 'r') as f: | |
2010 self.assertEqual(f.read(), 'foo') | |
2011 | |
2012 def test_cp_upload_respects_no_hashes(self): | |
2013 bucket_uri = self.CreateBucket() | |
2014 fpath = self.CreateTempFile(contents='abcd') | |
2015 with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]): | |
2016 log_handler = self.RunCommand('cp', [fpath, suri(bucket_uri)], | |
2017 return_log_handler=True) | |
2018 warning_messages = log_handler.messages['warning'] | |
2019 self.assertEquals(1, len(warning_messages)) | |
2020 self.assertIn('Found no hashes to validate object upload', | |
2021 warning_messages[0]) | |
OLD | NEW |