Index: tools/telemetry/third_party/gsutilz/gslib/tests/test_cp.py |
diff --git a/tools/telemetry/third_party/gsutilz/gslib/tests/test_cp.py b/tools/telemetry/third_party/gsutilz/gslib/tests/test_cp.py |
index 7c44c366a614f71eb3c020660c5bf6876e0764b9..c216fb64a6a0345bfbf5f9e0b5ee8030ce1bec63 100644 |
--- a/tools/telemetry/third_party/gsutilz/gslib/tests/test_cp.py |
+++ b/tools/telemetry/third_party/gsutilz/gslib/tests/test_cp.py |
@@ -33,13 +33,14 @@ from apitools.base.py import exceptions as apitools_exceptions |
import boto |
from boto import storage_uri |
from boto.exception import ResumableTransferDisposition |
-from boto.exception import ResumableUploadException |
from boto.exception import StorageResponseError |
from boto.storage_uri import BucketStorageUri |
+import crcmod |
from gslib.cloud_api import ResumableDownloadException |
from gslib.cloud_api import ResumableUploadException |
from gslib.cloud_api import ResumableUploadStartOverException |
+from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD |
from gslib.copy_helper import GetTrackerFilePath |
from gslib.copy_helper import TrackerFileType |
from gslib.cs_api_map import ApiSelector |
@@ -52,19 +53,22 @@ from gslib.tests.testcase.integration_testcase import SkipForS3 |
from gslib.tests.util import GenerationFromURI as urigen |
from gslib.tests.util import HAS_S3_CREDS |
from gslib.tests.util import ObjectToURI as suri |
-from gslib.tests.util import PerformsFileToObjectUpload |
+from gslib.tests.util import SequentialAndParallelTransfer |
from gslib.tests.util import SetBotoConfigForTest |
from gslib.tests.util import unittest |
from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages |
from gslib.tracker_file import DeleteTrackerFile |
from gslib.tracker_file import GetRewriteTrackerFilePath |
+from gslib.tracker_file import GetSlicedDownloadTrackerFilePaths |
from gslib.util import EIGHT_MIB |
+from gslib.util import HumanReadableToBytes |
from gslib.util import IS_WINDOWS |
from gslib.util import MakeHumanReadable |
from gslib.util import ONE_KIB |
from gslib.util import ONE_MIB |
from gslib.util import Retry |
from gslib.util import START_CALLBACK_PER_BYTES |
+from gslib.util import UsingCrcmodExtension |
from gslib.util import UTF8 |
@@ -139,6 +143,24 @@ class _XMLResumableUploadStartOverCopyCallbackHandler(object): |
ResumableTransferDisposition.START_OVER) |
+class _HaltOneComponentCopyCallbackHandler(object): |
+ """Test callback handler for stopping part of a sliced download.""" |
+ |
+ def __init__(self, halt_at_byte): |
+ self._last_progress_byte = None |
+ self._halt_at_byte = halt_at_byte |
+ |
+ # pylint: disable=invalid-name |
+ # pylint: disable=unused-argument |
+ def call(self, current_progress_byte, total_size_unused): |
+ """Forcibly exits if the passed the halting point since the last call.""" |
+ if (self._last_progress_byte is not None and |
+ self._last_progress_byte < self._halt_at_byte < current_progress_byte): |
+ sys.stderr.write('Halting transfer.\r\n') |
+ raise ResumableDownloadException('Artifically halting download.') |
+ self._last_progress_byte = current_progress_byte |
+ |
+ |
class _DeleteBucketThenStartOverCopyCallbackHandler(object): |
"""Test callback handler that deletes bucket then raises start-over.""" |
@@ -239,7 +261,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
contents = pkgutil.get_data('gslib', 'tests/test_data/%s' % name) |
return self.CreateTempFile(file_name=name, contents=contents) |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_noclobber(self): |
key_uri = self.CreateObject(contents='foo') |
fpath = self.CreateTempFile(contents='bar') |
@@ -259,7 +281,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
'%s://%s' % (self.default_provider, self.nonexistent_bucket_name)) |
stderr = self.RunGsUtil(['cp', fpath, invalid_bucket_uri], |
expected_status=1, return_stderr=True) |
- self.assertIn('does not exist.', stderr) |
+ self.assertIn('does not exist', stderr) |
def test_copy_in_cloud_noclobber(self): |
bucket1_uri = self.CreateBucket() |
@@ -275,7 +297,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertIn('Skipping existing item: %s' % |
suri(bucket2_uri, key_uri.object_name), stderr) |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_streaming(self): |
bucket_uri = self.CreateBucket() |
stderr = self.RunGsUtil(['cp', '-', '%s' % suri(bucket_uri, 'foo')], |
@@ -293,7 +315,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
# TODO: Implement a way to test both with and without using magic file. |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_detect_content_type(self): |
"""Tests local detection of content type.""" |
bucket_uri = self.CreateBucket() |
@@ -375,7 +397,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
_Check2() |
@unittest.skipIf(IS_WINDOWS, 'magicfile is not available on Windows.') |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_magicfile_override(self): |
"""Tests content type override with magicfile value.""" |
bucket_uri = self.CreateBucket() |
@@ -393,7 +415,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertRegexpMatches(stdout, r'Content-Type:\s+%s' % content_type) |
_Check1() |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_content_type_mismatches(self): |
"""Tests overriding content type when it does not match the file type.""" |
bucket_uri = self.CreateBucket() |
@@ -429,7 +451,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
_Check3() |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_content_type_header_case_insensitive(self): |
"""Tests that content type header is treated with case insensitivity.""" |
bucket_uri = self.CreateBucket() |
@@ -459,7 +481,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertNotRegexpMatches(stdout, r'image/gif,\s*image/gif') |
_Check2() |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_other_headers(self): |
"""Tests that non-content-type headers are applied successfully on copy.""" |
bucket_uri = self.CreateBucket() |
@@ -481,7 +503,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12') |
self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd') |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_versioning(self): |
"""Tests copy with versioning.""" |
bucket_uri = self.CreateVersionedBucket() |
@@ -524,11 +546,20 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
expected_status=1) |
self.assertIn('cannot be the destination for gsutil cp', stderr) |
+ def test_versioning_no_parallelism(self): |
+ """Tests that copy all-versions errors when parallelism is enabled.""" |
+ stderr = self.RunGsUtil( |
+ ['-m', 'cp', '-A', suri(self.nonexistent_bucket_name, 'foo'), |
+ suri(self.nonexistent_bucket_name, 'bar')], |
+ expected_status=1, return_stderr=True) |
+ self.assertIn('-m option is not supported with the cp -A flag', stderr) |
+ |
@SkipForS3('S3 lists versioned objects in reverse timestamp order.') |
def test_recursive_copying_versioned_bucket(self): |
- """Tests that cp -R with versioned buckets copies all versions in order.""" |
+ """Tests cp -R with versioned buckets.""" |
bucket1_uri = self.CreateVersionedBucket() |
bucket2_uri = self.CreateVersionedBucket() |
+ bucket3_uri = self.CreateVersionedBucket() |
# Write two versions of an object to the bucket1. |
self.CreateObject(bucket_uri=bucket1_uri, object_name='k', contents='data0') |
@@ -537,9 +568,12 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.AssertNObjectsInBucket(bucket1_uri, 2, versioned=True) |
self.AssertNObjectsInBucket(bucket2_uri, 0, versioned=True) |
+ self.AssertNObjectsInBucket(bucket3_uri, 0, versioned=True) |
# Recursively copy to second versioned bucket. |
- self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'), suri(bucket2_uri)]) |
+ # -A flag should copy all versions in order. |
+ self.RunGsUtil(['cp', '-R', '-A', suri(bucket1_uri, '*'), |
+ suri(bucket2_uri)]) |
# Use @Retry as hedge against bucket listing eventual consistency. |
@Retry(AssertionError, tries=3, timeout_secs=1) |
@@ -570,7 +604,31 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertEquals(storage_uri(uri_str2).object_name, 'k') |
_Check2() |
- @PerformsFileToObjectUpload |
+ # Recursively copy to second versioned bucket with no -A flag. |
+ # This should copy only the live object. |
+ self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'), |
+ suri(bucket3_uri)]) |
+ |
+ # Use @Retry as hedge against bucket listing eventual consistency. |
+ @Retry(AssertionError, tries=3, timeout_secs=1) |
+ def _Check3(): |
+ """Validates the results of the cp -R.""" |
+ listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)], |
+ return_stdout=True).split('\n') |
+ listing2 = self.RunGsUtil(['ls', '-la', suri(bucket3_uri)], |
+ return_stdout=True).split('\n') |
+ # 2 lines of listing output, 1 summary line, 1 empty line from \n split. |
+ self.assertEquals(len(listing1), 4) |
+ # 1 lines of listing output, 1 summary line, 1 empty line from \n split. |
+ self.assertEquals(len(listing2), 3) |
+ |
+ # Live (second) object in bucket 1 should match the single live object. |
+ size1, _, uri_str1, _ = listing2[0].split() |
+ self.assertEquals(size1, str(len('longer_data1'))) |
+ self.assertEquals(storage_uri(uri_str1).object_name, 'k') |
+ _Check3() |
+ |
+ @SequentialAndParallelTransfer |
@SkipForS3('Preconditions not supported for S3.') |
def test_cp_generation_zero_match(self): |
"""Tests that cp handles an object-not-exists precondition header.""" |
@@ -591,7 +649,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
return_stderr=True, expected_status=1) |
self.assertIn('PreconditionException', stderr) |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
@SkipForS3('Preconditions not supported for S3.') |
def test_cp_v_generation_match(self): |
"""Tests that cp -v option handles the if-generation-match header.""" |
@@ -623,7 +681,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertIn('Specifying x-goog-if-generation-match is not supported ' |
'with cp -n', stderr) |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_cp_nv(self): |
"""Tests that cp -nv works when skipping existing file.""" |
bucket_uri = self.CreateVersionedBucket() |
@@ -640,7 +698,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
return_stderr=True) |
self.assertIn('Skipping existing item:', stderr) |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
@SkipForS3('S3 lists versioned objects in reverse timestamp order.') |
def test_cp_v_option(self): |
""""Tests that cp -v returns the created object's version-specific URI.""" |
@@ -699,7 +757,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertEqual(created_uri, lines[-2]) |
_Check1() |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_stdin_args(self): |
"""Tests cp with the -I option.""" |
tmpdir = self.CreateTempDir() |
@@ -844,7 +902,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertEqual(public_read_acl, new_acl_json) |
_Check() |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_canned_acl_upload(self): |
"""Tests uploading a file with a canned ACL.""" |
bucket1_uri = self.CreateBucket() |
@@ -889,7 +947,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
stdout = self.RunGsUtil(['cp', fpath, '-'], return_stdout=True) |
self.assertIn(contents, stdout) |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_cp_zero_byte_file(self): |
dst_bucket_uri = self.CreateBucket() |
src_dir = self.CreateTempDir() |
@@ -909,7 +967,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertTrue(os.stat(download_path)) |
def test_copy_bucket_to_bucket(self): |
- """Tests that recursively copying from bucket to bucket. |
+ """Tests recursively copying from bucket to bucket. |
This should produce identically named objects (and not, in particular, |
destination objects named by the version-specific URI from source objects). |
@@ -978,9 +1036,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
key_uri.set_contents_from_string('') |
self.AssertNObjectsInBucket(src_bucket_uri, 3) |
- stderr = self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir], |
- return_stderr=True) |
- self.assertIn('Skipping cloud sub-directory placeholder object', stderr) |
+ self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir]) |
dir_list = [] |
for dirname, _, filenames in os.walk(dst_dir): |
for filename in filenames: |
@@ -1025,12 +1081,12 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
@unittest.skipIf(IS_WINDOWS, |
'Unicode handling on Windows requires mods to site-packages') |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_cp_manifest_upload_unicode(self): |
return self._ManifestUpload('foo-unicöde', 'bar-unicöde', |
'manifest-unicöde') |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_cp_manifest_upload(self): |
"""Tests uploading with a mnifest file.""" |
return self._ManifestUpload('foo', 'bar', 'manifest') |
@@ -1071,7 +1127,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertEqual(int(results[7]), 3) # Bytes Transferred |
self.assertEqual(results[8], 'OK') # Result |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_cp_manifest_download(self): |
"""Tests downloading with a manifest file.""" |
key_uri = self.CreateObject(contents='foo') |
@@ -1097,14 +1153,13 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
start_date = datetime.datetime.strptime(results[2], date_format) |
end_date = datetime.datetime.strptime(results[3], date_format) |
self.assertEqual(end_date > start_date, True) |
- self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5 |
self.assertEqual(int(results[6]), 3) # Source Size |
# Bytes transferred might be more than 3 if the file was gzipped, since |
# the minimum gzip header is 10 bytes. |
self.assertGreaterEqual(int(results[7]), 3) # Bytes Transferred |
self.assertEqual(results[8], 'OK') # Result |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_copy_unicode_non_ascii_filename(self): |
key_uri = self.CreateObject(contents='foo') |
# Make file large enough to cause a resumable upload (which hashes filename |
@@ -1122,6 +1177,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
# such files (so, failed that test). Given that, we decided to remove the |
# test. |
+ @SequentialAndParallelTransfer |
def test_gzip_upload_and_download(self): |
bucket_uri = self.CreateBucket() |
contents = 'x' * 10000 |
@@ -1158,6 +1214,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertIn('Copying file:', stderr) |
self.AssertNObjectsInBucket(bucket_uri, 1) |
+ @SequentialAndParallelTransfer |
def test_cp_object_ending_with_slash(self): |
"""Tests that cp works with object names ending with slash.""" |
tmpdir = self.CreateTempDir() |
@@ -1214,6 +1271,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.RunGsUtil(['-m', 'cp', wildcard_uri, suri(bucket_uri)]) |
self.AssertNObjectsInBucket(bucket_uri, num_test_files) |
+ @SequentialAndParallelTransfer |
def test_cp_duplicate_source_args(self): |
"""Tests that cp -m works when a source argument is provided twice.""" |
object_contents = 'edge' |
@@ -1416,7 +1474,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
@NotParallelizable |
@SkipForS3('No resumable upload support for S3.') |
@unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.') |
- @PerformsFileToObjectUpload |
+ @SequentialAndParallelTransfer |
def test_cp_unwritable_tracker_file(self): |
"""Tests a resumable upload with an unwritable tracker file.""" |
bucket_uri = self.CreateBucket() |
@@ -1443,6 +1501,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
# interferes with any parallel running tests that use the tracker directory. |
@NotParallelizable |
@unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.') |
+ @SequentialAndParallelTransfer |
def test_cp_unwritable_tracker_file_download(self): |
"""Tests downloads with an unwritable tracker file.""" |
object_uri = self.CreateObject(contents='foo' * ONE_KIB) |
@@ -1491,6 +1550,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
return_stderr=True) |
self.assertIn('Resuming download', stderr) |
+ @SequentialAndParallelTransfer |
def test_cp_resumable_download_etag_differs(self): |
"""Tests that download restarts the file when the source object changes. |
@@ -1498,7 +1558,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
""" |
bucket_uri = self.CreateBucket() |
object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
- contents='a' * self.halt_size) |
+ contents='abc' * self.halt_size) |
fpath = self.CreateTempFile() |
test_callback_file = self.CreateTempFile( |
contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
@@ -1517,6 +1577,44 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
return_stderr=True) |
self.assertNotIn('Resuming download', stderr) |
+ # TODO: Enable this test for sequential downloads when their tracker files are |
+ # modified to contain the source object generation. |
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod), |
+ 'Sliced download requires fast crcmod.') |
+ @SkipForS3('No sliced download support for S3.') |
+ def test_cp_resumable_download_generation_differs(self): |
+ """Tests that a resumable download restarts if the generation differs.""" |
+ bucket_uri = self.CreateBucket() |
+ file_contents = 'abcd' * self.halt_size |
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
+ contents=file_contents) |
+ fpath = self.CreateTempFile() |
+ |
+ test_callback_file = self.CreateTempFile( |
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
+ |
+ boto_config_for_test = [ |
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_max_components', '3')] |
+ |
+ with SetBotoConfigForTest(boto_config_for_test): |
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
+ suri(object_uri), suri(fpath)], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('Artifically halting download.', stderr) |
+ |
+ # Overwrite the object with an identical object, increasing |
+ # the generation but leaving other metadata the same. |
+ identical_file = self.CreateTempFile(contents=file_contents) |
+ self.RunGsUtil(['cp', suri(identical_file), suri(object_uri)]) |
+ |
+ stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath)], |
+ return_stderr=True) |
+ self.assertIn('Restarting download from scratch', stderr) |
+ with open(fpath, 'r') as f: |
+ self.assertEqual(f.read(), file_contents, 'File contents differ') |
+ |
def test_cp_resumable_download_file_larger(self): |
"""Tests download deletes the tracker file when existing file is larger.""" |
bucket_uri = self.CreateBucket() |
@@ -1531,13 +1629,12 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
suri(object_uri), fpath], |
expected_status=1, return_stderr=True) |
self.assertIn('Artifically halting download.', stderr) |
- with open(fpath, 'w') as larger_file: |
+ with open(fpath + '_.gstmp', 'w') as larger_file: |
for _ in range(self.halt_size * 2): |
larger_file.write('a') |
stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
expected_status=1, return_stderr=True) |
self.assertNotIn('Resuming download', stderr) |
- self.assertIn('is larger', stderr) |
self.assertIn('Deleting tracker file', stderr) |
def test_cp_resumable_download_content_differs(self): |
@@ -1550,7 +1647,11 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
""" |
bucket_uri = self.CreateBucket() |
tmp_dir = self.CreateTempDir() |
- fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB) |
+ fpath = self.CreateTempFile(tmpdir=tmp_dir) |
+ temp_download_file = fpath + '_.gstmp' |
+ with open(temp_download_file, 'w') as fp: |
+ fp.write('abcd' * ONE_KIB) |
+ |
object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
contents='efgh' * ONE_KIB) |
stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) |
@@ -1569,11 +1670,13 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
with SetBotoConfigForTest([boto_config_for_test]): |
stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
return_stderr=True, expected_status=1) |
- self.assertIn('Download already complete for file', stderr) |
+ self.assertIn('Download already complete', stderr) |
self.assertIn('doesn\'t match cloud-supplied digest', stderr) |
# File and tracker file should be deleted. |
- self.assertFalse(os.path.isfile(fpath)) |
+ self.assertFalse(os.path.isfile(temp_download_file)) |
self.assertFalse(os.path.isfile(tracker_filename)) |
+ # Permanent file should not have been created. |
+ self.assertFalse(os.path.isfile(fpath)) |
finally: |
if os.path.exists(tracker_filename): |
os.unlink(tracker_filename) |
@@ -1582,8 +1685,12 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
"""Tests download no-ops when tracker file matches existing file.""" |
bucket_uri = self.CreateBucket() |
tmp_dir = self.CreateTempDir() |
+ fpath = self.CreateTempFile(tmpdir=tmp_dir) |
matching_contents = 'abcd' * ONE_KIB |
- fpath = self.CreateTempFile(tmpdir=tmp_dir, contents=matching_contents) |
+ temp_download_file = fpath + '_.gstmp' |
+ with open(temp_download_file, 'w') as fp: |
+ fp.write(matching_contents) |
+ |
object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
contents=matching_contents) |
stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) |
@@ -1601,7 +1708,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
with SetBotoConfigForTest([boto_config_for_test]): |
stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
return_stderr=True) |
- self.assertIn('Download already complete for file', stderr) |
+ self.assertIn('Download already complete', stderr) |
# Tracker file should be removed after successful hash validation. |
self.assertFalse(os.path.isfile(tracker_filename)) |
finally: |
@@ -1643,6 +1750,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
if os.path.exists(tracker_filename): |
os.unlink(tracker_filename) |
+ @SequentialAndParallelTransfer |
def test_cp_resumable_download_gzip(self): |
"""Tests that download can be resumed successfully with a gzipped file.""" |
# Generate some reasonably incompressible data. This compresses to a bit |
@@ -1683,12 +1791,26 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
suri(object_uri), suri(fpath2)], |
return_stderr=True, expected_status=1) |
self.assertIn('Artifically halting download.', stderr) |
- tracker_filename = GetTrackerFilePath( |
- StorageUrlFromString(fpath2), TrackerFileType.DOWNLOAD, self.test_api) |
- self.assertTrue(os.path.isfile(tracker_filename)) |
self.assertIn('Downloading to temp gzip filename', stderr) |
+ |
+ # Tracker files will have different names depending on if we are |
+ # downloading sequentially or in parallel. |
+ sliced_download_threshold = HumanReadableToBytes( |
+ boto.config.get('GSUtil', 'sliced_object_download_threshold', |
+ DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD)) |
+ sliced_download = (len(contents) > sliced_download_threshold |
+ and sliced_download_threshold > 0 |
+ and UsingCrcmodExtension(crcmod)) |
+ if sliced_download: |
+ trackerfile_type = TrackerFileType.SLICED_DOWNLOAD |
+ else: |
+ trackerfile_type = TrackerFileType.DOWNLOAD |
+ tracker_filename = GetTrackerFilePath( |
+ StorageUrlFromString(fpath2), trackerfile_type, self.test_api) |
+ |
# We should have a temporary gzipped file, a tracker file, and no |
# final file yet. |
+ self.assertTrue(os.path.isfile(tracker_filename)) |
self.assertTrue(os.path.isfile('%s_.gztmp' % fpath2)) |
stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)], |
return_stderr=True) |
@@ -1698,6 +1820,31 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertFalse(os.path.isfile(tracker_filename)) |
self.assertFalse(os.path.isfile('%s_.gztmp' % fpath2)) |
+ @SequentialAndParallelTransfer |
+ def test_cp_resumable_download_check_hashes_never(self): |
+ """Tests that resumble downloads work with check_hashes = never.""" |
+ bucket_uri = self.CreateBucket() |
+ contents = 'abcd' * self.halt_size |
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
+ contents=contents) |
+ fpath = self.CreateTempFile() |
+ test_callback_file = self.CreateTempFile( |
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
+ |
+ boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)), |
+ ('GSUtil', 'check_hashes', 'never')] |
+ with SetBotoConfigForTest(boto_config_for_test): |
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
+ suri(object_uri), fpath], |
+ expected_status=1, return_stderr=True) |
+ self.assertIn('Artifically halting download.', stderr) |
+ stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
+ return_stderr=True) |
+ self.assertIn('Resuming download', stderr) |
+ self.assertIn('Found no hashes to validate object downloaded', stderr) |
+ with open(fpath, 'r') as f: |
+ self.assertEqual(f.read(), contents, 'File contents did not match.') |
+ |
@SkipForS3('No resumable upload support for S3.') |
def test_cp_resumable_upload_bucket_deleted(self): |
"""Tests that a not found exception is raised if bucket no longer exists.""" |
@@ -1715,6 +1862,312 @@ class TestCp(testcase.GsUtilIntegrationTestCase): |
self.assertIn('Deleting bucket', stderr) |
self.assertIn('bucket does not exist', stderr) |
+ @SkipForS3('No sliced download support for S3.') |
+ def test_cp_sliced_download(self): |
+ """Tests that sliced object download works in the general case.""" |
+ bucket_uri = self.CreateBucket() |
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
+ contents='abc' * ONE_KIB) |
+ fpath = self.CreateTempFile() |
+ |
+ # Force fast crcmod to return True to test the basic sliced download |
+ # scenario, ensuring that if the user installs crcmod, it will work. |
+ boto_config_for_test = [ |
+ ('GSUtil', 'resumable_threshold', str(ONE_KIB)), |
+ ('GSUtil', 'test_assume_fast_crcmod', 'True'), |
+ ('GSUtil', 'sliced_object_download_threshold', str(ONE_KIB)), |
+ ('GSUtil', 'sliced_object_download_max_components', '3')] |
+ |
+ with SetBotoConfigForTest(boto_config_for_test): |
+ self.RunGsUtil(['cp', suri(object_uri), fpath]) |
+ |
+ # Each tracker file should have been deleted. |
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths( |
+ StorageUrlFromString(fpath), self.test_api) |
+ for tracker_filename in tracker_filenames: |
+ self.assertFalse(os.path.isfile(tracker_filename)) |
+ |
+ with open(fpath, 'r') as f: |
+ self.assertEqual(f.read(), 'abc' * ONE_KIB, 'File contents differ') |
+ |
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod), |
+ 'Sliced download requires fast crcmod.') |
+ @SkipForS3('No sliced download support for S3.') |
+ def test_cp_unresumable_sliced_download(self): |
+ """Tests sliced download works when resumability is disabled.""" |
+ bucket_uri = self.CreateBucket() |
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
+ contents='abcd' * self.halt_size) |
+ fpath = self.CreateTempFile() |
+ test_callback_file = self.CreateTempFile( |
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
+ |
+ boto_config_for_test = [ |
+ ('GSUtil', 'resumable_threshold', str(self.halt_size*5)), |
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_max_components', '4')] |
+ |
+ with SetBotoConfigForTest(boto_config_for_test): |
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
+ suri(object_uri), suri(fpath)], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('not downloaded successfully', stderr) |
+ # Temporary download file should exist. |
+ self.assertTrue(os.path.isfile(fpath + '_.gstmp')) |
+ |
+ # No tracker files should exist. |
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths( |
+ StorageUrlFromString(fpath), self.test_api) |
+ for tracker_filename in tracker_filenames: |
+ self.assertFalse(os.path.isfile(tracker_filename)) |
+ |
+ # Perform the entire download, without resuming. |
+ with SetBotoConfigForTest(boto_config_for_test): |
+ stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath)], |
+ return_stderr=True) |
+ self.assertNotIn('Resuming download', stderr) |
+ # Temporary download file should have been deleted. |
+ self.assertFalse(os.path.isfile(fpath + '_.gstmp')) |
+ with open(fpath, 'r') as f: |
+ self.assertEqual(f.read(), 'abcd' * self.halt_size, |
+ 'File contents differ') |
+ |
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod), |
+ 'Sliced download requires fast crcmod.') |
+ @SkipForS3('No sliced download support for S3.') |
+ def test_cp_sliced_download_resume(self): |
+ """Tests that sliced object download is resumable.""" |
+ bucket_uri = self.CreateBucket() |
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
+ contents='abc' * self.halt_size) |
+ fpath = self.CreateTempFile() |
+ test_callback_file = self.CreateTempFile( |
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
+ |
+ boto_config_for_test = [ |
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_max_components', '3')] |
+ |
+ with SetBotoConfigForTest(boto_config_for_test): |
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
+ suri(object_uri), suri(fpath)], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('not downloaded successfully', stderr) |
+ |
+ # Each tracker file should exist. |
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths( |
+ StorageUrlFromString(fpath), self.test_api) |
+ for tracker_filename in tracker_filenames: |
+ self.assertTrue(os.path.isfile(tracker_filename)) |
+ |
+ stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
+ return_stderr=True) |
+ self.assertIn('Resuming download', stderr) |
+ |
+ # Each tracker file should have been deleted. |
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths( |
+ StorageUrlFromString(fpath), self.test_api) |
+ for tracker_filename in tracker_filenames: |
+ self.assertFalse(os.path.isfile(tracker_filename)) |
+ |
+ with open(fpath, 'r') as f: |
+ self.assertEqual(f.read(), 'abc' * self.halt_size, |
+ 'File contents differ') |
+ |
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod), |
+ 'Sliced download requires fast crcmod.') |
+ @SkipForS3('No sliced download support for S3.') |
+ def test_cp_sliced_download_partial_resume(self): |
+ """Test sliced download resumability when some components are finished.""" |
+ bucket_uri = self.CreateBucket() |
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
+ contents='abc' * self.halt_size) |
+ fpath = self.CreateTempFile() |
+ test_callback_file = self.CreateTempFile( |
+ contents=pickle.dumps(_HaltOneComponentCopyCallbackHandler(5))) |
+ |
+ boto_config_for_test = [ |
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_max_components', '3')] |
+ |
+ with SetBotoConfigForTest(boto_config_for_test): |
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
+ suri(object_uri), suri(fpath)], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('not downloaded successfully', stderr) |
+ |
+ # Each tracker file should exist. |
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths( |
+ StorageUrlFromString(fpath), self.test_api) |
+ for tracker_filename in tracker_filenames: |
+ self.assertTrue(os.path.isfile(tracker_filename)) |
+ |
+ stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
+ return_stderr=True) |
+ self.assertIn('Resuming download', stderr) |
+ self.assertIn('Download already complete', stderr) |
+ |
+ # Each tracker file should have been deleted. |
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths( |
+ StorageUrlFromString(fpath), self.test_api) |
+ for tracker_filename in tracker_filenames: |
+ self.assertFalse(os.path.isfile(tracker_filename)) |
+ |
+ with open(fpath, 'r') as f: |
+ self.assertEqual(f.read(), 'abc' * self.halt_size, |
+ 'File contents differ') |
+ |
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod), |
+ 'Sliced download requires fast crcmod.') |
+ @SkipForS3('No sliced download support for S3.') |
+ def test_cp_sliced_download_resume_content_differs(self): |
+ """Tests differing file contents are detected by sliced downloads.""" |
+ bucket_uri = self.CreateBucket() |
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
+ contents='abc' * self.halt_size) |
+ fpath = self.CreateTempFile(contents='') |
+ test_callback_file = self.CreateTempFile( |
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
+ |
+ boto_config_for_test = [ |
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_max_components', '3')] |
+ |
+ with SetBotoConfigForTest(boto_config_for_test): |
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
+ suri(object_uri), suri(fpath)], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('not downloaded successfully', stderr) |
+ |
+ # Temporary download file should exist. |
+ self.assertTrue(os.path.isfile(fpath + '_.gstmp')) |
+ |
+ # Each tracker file should exist. |
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths( |
+ StorageUrlFromString(fpath), self.test_api) |
+ for tracker_filename in tracker_filenames: |
+ self.assertTrue(os.path.isfile(tracker_filename)) |
+ |
+ with open(fpath + '_.gstmp', 'r+b') as f: |
+ f.write('altered file contents') |
+ |
+ stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('Resuming download', stderr) |
+ self.assertIn('doesn\'t match cloud-supplied digest', stderr) |
+ self.assertIn('HashMismatchException: crc32c', stderr) |
+ |
+ # Each tracker file should have been deleted. |
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths( |
+ StorageUrlFromString(fpath), self.test_api) |
+ for tracker_filename in tracker_filenames: |
+ self.assertFalse(os.path.isfile(tracker_filename)) |
+ |
+ # Temporary file should have been deleted due to hash mismatch. |
+ self.assertFalse(os.path.isfile(fpath + '_.gstmp')) |
+ # Final file should not exist. |
+ self.assertFalse(os.path.isfile(fpath)) |
+ |
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod), |
+ 'Sliced download requires fast crcmod.') |
+ @SkipForS3('No sliced download support for S3.') |
+ def test_cp_sliced_download_component_size_changed(self): |
+ """Tests sliced download doesn't break when the boto config changes. |
+ |
+ If the number of components used changes cross-process, the download should |
+ be restarted. |
+ """ |
+ bucket_uri = self.CreateBucket() |
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
+ contents='abcd' * self.halt_size) |
+ fpath = self.CreateTempFile() |
+ test_callback_file = self.CreateTempFile( |
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
+ |
+ boto_config_for_test = [ |
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_component_size', |
+ str(self.halt_size//4)), |
+ ('GSUtil', 'sliced_object_download_max_components', '4')] |
+ |
+ with SetBotoConfigForTest(boto_config_for_test): |
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
+ suri(object_uri), suri(fpath)], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('not downloaded successfully', stderr) |
+ |
+ boto_config_for_test = [ |
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_component_size', |
+ str(self.halt_size//2)), |
+ ('GSUtil', 'sliced_object_download_max_components', '2')] |
+ |
+ with SetBotoConfigForTest(boto_config_for_test): |
+ stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
+ return_stderr=True) |
+ self.assertIn('Sliced download tracker file doesn\'t match ', stderr) |
+ self.assertIn('Restarting download from scratch', stderr) |
+ self.assertNotIn('Resuming download', stderr) |
+ |
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod), |
+ 'Sliced download requires fast crcmod.') |
+ @SkipForS3('No sliced download support for S3.') |
+ def test_cp_sliced_download_disabled_cross_process(self): |
+ """Tests temporary files are not orphaned if sliced download is disabled. |
+ |
+ Specifically, temporary files should be deleted when the corresponding |
+ non-sliced download is completed. |
+ """ |
+ bucket_uri = self.CreateBucket() |
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
+ contents='abcd' * self.halt_size) |
+ fpath = self.CreateTempFile() |
+ test_callback_file = self.CreateTempFile( |
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
+ |
+ boto_config_for_test = [ |
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_max_components', '4')] |
+ |
+ with SetBotoConfigForTest(boto_config_for_test): |
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
+ suri(object_uri), suri(fpath)], |
+ return_stderr=True, expected_status=1) |
+ self.assertIn('not downloaded successfully', stderr) |
+ # Temporary download file should exist. |
+ self.assertTrue(os.path.isfile(fpath + '_.gstmp')) |
+ |
+ # Each tracker file should exist. |
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths( |
+ StorageUrlFromString(fpath), self.test_api) |
+ for tracker_filename in tracker_filenames: |
+ self.assertTrue(os.path.isfile(tracker_filename)) |
+ |
+ # Disable sliced downloads by increasing the threshold |
+ boto_config_for_test = [ |
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)), |
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size*5)), |
+ ('GSUtil', 'sliced_object_download_max_components', '4')] |
+ |
+ with SetBotoConfigForTest(boto_config_for_test): |
+ stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
+ return_stderr=True) |
+ self.assertNotIn('Resuming download', stderr) |
+ # Temporary download file should have been deleted. |
+ self.assertFalse(os.path.isfile(fpath + '_.gstmp')) |
+ |
+ # Each tracker file should have been deleted. |
+ for tracker_filename in tracker_filenames: |
+ self.assertFalse(os.path.isfile(tracker_filename)) |
+ with open(fpath, 'r') as f: |
+ self.assertEqual(f.read(), 'abcd' * self.halt_size) |
+ |
@SkipForS3('No resumable upload support for S3.') |
def test_cp_resumable_upload_start_over_http_error(self): |
for start_over_error in (404, 410): |