| Index: third_party/gsutil/gslib/tests/test_cp.py
|
| diff --git a/third_party/gsutil/gslib/tests/test_cp.py b/third_party/gsutil/gslib/tests/test_cp.py
|
| index 7c44c366a614f71eb3c020660c5bf6876e0764b9..c216fb64a6a0345bfbf5f9e0b5ee8030ce1bec63 100644
|
| --- a/third_party/gsutil/gslib/tests/test_cp.py
|
| +++ b/third_party/gsutil/gslib/tests/test_cp.py
|
| @@ -33,13 +33,14 @@ from apitools.base.py import exceptions as apitools_exceptions
|
| import boto
|
| from boto import storage_uri
|
| from boto.exception import ResumableTransferDisposition
|
| -from boto.exception import ResumableUploadException
|
| from boto.exception import StorageResponseError
|
| from boto.storage_uri import BucketStorageUri
|
| +import crcmod
|
|
|
| from gslib.cloud_api import ResumableDownloadException
|
| from gslib.cloud_api import ResumableUploadException
|
| from gslib.cloud_api import ResumableUploadStartOverException
|
| +from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD
|
| from gslib.copy_helper import GetTrackerFilePath
|
| from gslib.copy_helper import TrackerFileType
|
| from gslib.cs_api_map import ApiSelector
|
| @@ -52,19 +53,22 @@ from gslib.tests.testcase.integration_testcase import SkipForS3
|
| from gslib.tests.util import GenerationFromURI as urigen
|
| from gslib.tests.util import HAS_S3_CREDS
|
| from gslib.tests.util import ObjectToURI as suri
|
| -from gslib.tests.util import PerformsFileToObjectUpload
|
| +from gslib.tests.util import SequentialAndParallelTransfer
|
| from gslib.tests.util import SetBotoConfigForTest
|
| from gslib.tests.util import unittest
|
| from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
|
| from gslib.tracker_file import DeleteTrackerFile
|
| from gslib.tracker_file import GetRewriteTrackerFilePath
|
| +from gslib.tracker_file import GetSlicedDownloadTrackerFilePaths
|
| from gslib.util import EIGHT_MIB
|
| +from gslib.util import HumanReadableToBytes
|
| from gslib.util import IS_WINDOWS
|
| from gslib.util import MakeHumanReadable
|
| from gslib.util import ONE_KIB
|
| from gslib.util import ONE_MIB
|
| from gslib.util import Retry
|
| from gslib.util import START_CALLBACK_PER_BYTES
|
| +from gslib.util import UsingCrcmodExtension
|
| from gslib.util import UTF8
|
|
|
|
|
| @@ -139,6 +143,24 @@ class _XMLResumableUploadStartOverCopyCallbackHandler(object):
|
| ResumableTransferDisposition.START_OVER)
|
|
|
|
|
| +class _HaltOneComponentCopyCallbackHandler(object):
|
| + """Test callback handler for stopping part of a sliced download."""
|
| +
|
| + def __init__(self, halt_at_byte):
|
| + self._last_progress_byte = None
|
| + self._halt_at_byte = halt_at_byte
|
| +
|
| + # pylint: disable=invalid-name
|
| + # pylint: disable=unused-argument
|
| + def call(self, current_progress_byte, total_size_unused):
|
| + """Forcibly exits if the passed the halting point since the last call."""
|
| + if (self._last_progress_byte is not None and
|
| + self._last_progress_byte < self._halt_at_byte < current_progress_byte):
|
| + sys.stderr.write('Halting transfer.\r\n')
|
| + raise ResumableDownloadException('Artifically halting download.')
|
| + self._last_progress_byte = current_progress_byte
|
| +
|
| +
|
| class _DeleteBucketThenStartOverCopyCallbackHandler(object):
|
| """Test callback handler that deletes bucket then raises start-over."""
|
|
|
| @@ -239,7 +261,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| contents = pkgutil.get_data('gslib', 'tests/test_data/%s' % name)
|
| return self.CreateTempFile(file_name=name, contents=contents)
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_noclobber(self):
|
| key_uri = self.CreateObject(contents='foo')
|
| fpath = self.CreateTempFile(contents='bar')
|
| @@ -259,7 +281,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| '%s://%s' % (self.default_provider, self.nonexistent_bucket_name))
|
| stderr = self.RunGsUtil(['cp', fpath, invalid_bucket_uri],
|
| expected_status=1, return_stderr=True)
|
| - self.assertIn('does not exist.', stderr)
|
| + self.assertIn('does not exist', stderr)
|
|
|
| def test_copy_in_cloud_noclobber(self):
|
| bucket1_uri = self.CreateBucket()
|
| @@ -275,7 +297,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertIn('Skipping existing item: %s' %
|
| suri(bucket2_uri, key_uri.object_name), stderr)
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_streaming(self):
|
| bucket_uri = self.CreateBucket()
|
| stderr = self.RunGsUtil(['cp', '-', '%s' % suri(bucket_uri, 'foo')],
|
| @@ -293,7 +315,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
|
|
| # TODO: Implement a way to test both with and without using magic file.
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_detect_content_type(self):
|
| """Tests local detection of content type."""
|
| bucket_uri = self.CreateBucket()
|
| @@ -375,7 +397,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| _Check2()
|
|
|
| @unittest.skipIf(IS_WINDOWS, 'magicfile is not available on Windows.')
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_magicfile_override(self):
|
| """Tests content type override with magicfile value."""
|
| bucket_uri = self.CreateBucket()
|
| @@ -393,7 +415,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertRegexpMatches(stdout, r'Content-Type:\s+%s' % content_type)
|
| _Check1()
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_content_type_mismatches(self):
|
| """Tests overriding content type when it does not match the file type."""
|
| bucket_uri = self.CreateBucket()
|
| @@ -429,7 +451,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
|
| _Check3()
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_content_type_header_case_insensitive(self):
|
| """Tests that content type header is treated with case insensitivity."""
|
| bucket_uri = self.CreateBucket()
|
| @@ -459,7 +481,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertNotRegexpMatches(stdout, r'image/gif,\s*image/gif')
|
| _Check2()
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_other_headers(self):
|
| """Tests that non-content-type headers are applied successfully on copy."""
|
| bucket_uri = self.CreateBucket()
|
| @@ -481,7 +503,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12')
|
| self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd')
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_versioning(self):
|
| """Tests copy with versioning."""
|
| bucket_uri = self.CreateVersionedBucket()
|
| @@ -524,11 +546,20 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| expected_status=1)
|
| self.assertIn('cannot be the destination for gsutil cp', stderr)
|
|
|
| + def test_versioning_no_parallelism(self):
|
| + """Tests that copy all-versions errors when parallelism is enabled."""
|
| + stderr = self.RunGsUtil(
|
| + ['-m', 'cp', '-A', suri(self.nonexistent_bucket_name, 'foo'),
|
| + suri(self.nonexistent_bucket_name, 'bar')],
|
| + expected_status=1, return_stderr=True)
|
| + self.assertIn('-m option is not supported with the cp -A flag', stderr)
|
| +
|
| @SkipForS3('S3 lists versioned objects in reverse timestamp order.')
|
| def test_recursive_copying_versioned_bucket(self):
|
| - """Tests that cp -R with versioned buckets copies all versions in order."""
|
| + """Tests cp -R with versioned buckets."""
|
| bucket1_uri = self.CreateVersionedBucket()
|
| bucket2_uri = self.CreateVersionedBucket()
|
| + bucket3_uri = self.CreateVersionedBucket()
|
|
|
| # Write two versions of an object to the bucket1.
|
| self.CreateObject(bucket_uri=bucket1_uri, object_name='k', contents='data0')
|
| @@ -537,9 +568,12 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
|
|
| self.AssertNObjectsInBucket(bucket1_uri, 2, versioned=True)
|
| self.AssertNObjectsInBucket(bucket2_uri, 0, versioned=True)
|
| + self.AssertNObjectsInBucket(bucket3_uri, 0, versioned=True)
|
|
|
| # Recursively copy to second versioned bucket.
|
| - self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'), suri(bucket2_uri)])
|
| + # -A flag should copy all versions in order.
|
| + self.RunGsUtil(['cp', '-R', '-A', suri(bucket1_uri, '*'),
|
| + suri(bucket2_uri)])
|
|
|
| # Use @Retry as hedge against bucket listing eventual consistency.
|
| @Retry(AssertionError, tries=3, timeout_secs=1)
|
| @@ -570,7 +604,31 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertEquals(storage_uri(uri_str2).object_name, 'k')
|
| _Check2()
|
|
|
| - @PerformsFileToObjectUpload
|
| + # Recursively copy to second versioned bucket with no -A flag.
|
| + # This should copy only the live object.
|
| + self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'),
|
| + suri(bucket3_uri)])
|
| +
|
| + # Use @Retry as hedge against bucket listing eventual consistency.
|
| + @Retry(AssertionError, tries=3, timeout_secs=1)
|
| + def _Check3():
|
| + """Validates the results of the cp -R."""
|
| + listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)],
|
| + return_stdout=True).split('\n')
|
| + listing2 = self.RunGsUtil(['ls', '-la', suri(bucket3_uri)],
|
| + return_stdout=True).split('\n')
|
| + # 2 lines of listing output, 1 summary line, 1 empty line from \n split.
|
| + self.assertEquals(len(listing1), 4)
|
| + # 1 lines of listing output, 1 summary line, 1 empty line from \n split.
|
| + self.assertEquals(len(listing2), 3)
|
| +
|
| + # Live (second) object in bucket 1 should match the single live object.
|
| + size1, _, uri_str1, _ = listing2[0].split()
|
| + self.assertEquals(size1, str(len('longer_data1')))
|
| + self.assertEquals(storage_uri(uri_str1).object_name, 'k')
|
| + _Check3()
|
| +
|
| + @SequentialAndParallelTransfer
|
| @SkipForS3('Preconditions not supported for S3.')
|
| def test_cp_generation_zero_match(self):
|
| """Tests that cp handles an object-not-exists precondition header."""
|
| @@ -591,7 +649,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| return_stderr=True, expected_status=1)
|
| self.assertIn('PreconditionException', stderr)
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| @SkipForS3('Preconditions not supported for S3.')
|
| def test_cp_v_generation_match(self):
|
| """Tests that cp -v option handles the if-generation-match header."""
|
| @@ -623,7 +681,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertIn('Specifying x-goog-if-generation-match is not supported '
|
| 'with cp -n', stderr)
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_cp_nv(self):
|
| """Tests that cp -nv works when skipping existing file."""
|
| bucket_uri = self.CreateVersionedBucket()
|
| @@ -640,7 +698,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| return_stderr=True)
|
| self.assertIn('Skipping existing item:', stderr)
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| @SkipForS3('S3 lists versioned objects in reverse timestamp order.')
|
| def test_cp_v_option(self):
|
| """"Tests that cp -v returns the created object's version-specific URI."""
|
| @@ -699,7 +757,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertEqual(created_uri, lines[-2])
|
| _Check1()
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_stdin_args(self):
|
| """Tests cp with the -I option."""
|
| tmpdir = self.CreateTempDir()
|
| @@ -844,7 +902,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertEqual(public_read_acl, new_acl_json)
|
| _Check()
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_canned_acl_upload(self):
|
| """Tests uploading a file with a canned ACL."""
|
| bucket1_uri = self.CreateBucket()
|
| @@ -889,7 +947,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| stdout = self.RunGsUtil(['cp', fpath, '-'], return_stdout=True)
|
| self.assertIn(contents, stdout)
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_cp_zero_byte_file(self):
|
| dst_bucket_uri = self.CreateBucket()
|
| src_dir = self.CreateTempDir()
|
| @@ -909,7 +967,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertTrue(os.stat(download_path))
|
|
|
| def test_copy_bucket_to_bucket(self):
|
| - """Tests that recursively copying from bucket to bucket.
|
| + """Tests recursively copying from bucket to bucket.
|
|
|
| This should produce identically named objects (and not, in particular,
|
| destination objects named by the version-specific URI from source objects).
|
| @@ -978,9 +1036,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| key_uri.set_contents_from_string('')
|
| self.AssertNObjectsInBucket(src_bucket_uri, 3)
|
|
|
| - stderr = self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir],
|
| - return_stderr=True)
|
| - self.assertIn('Skipping cloud sub-directory placeholder object', stderr)
|
| + self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir])
|
| dir_list = []
|
| for dirname, _, filenames in os.walk(dst_dir):
|
| for filename in filenames:
|
| @@ -1025,12 +1081,12 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
|
|
| @unittest.skipIf(IS_WINDOWS,
|
| 'Unicode handling on Windows requires mods to site-packages')
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_cp_manifest_upload_unicode(self):
|
| return self._ManifestUpload('foo-unicöde', 'bar-unicöde',
|
| 'manifest-unicöde')
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_cp_manifest_upload(self):
|
| """Tests uploading with a mnifest file."""
|
| return self._ManifestUpload('foo', 'bar', 'manifest')
|
| @@ -1071,7 +1127,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertEqual(int(results[7]), 3) # Bytes Transferred
|
| self.assertEqual(results[8], 'OK') # Result
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_cp_manifest_download(self):
|
| """Tests downloading with a manifest file."""
|
| key_uri = self.CreateObject(contents='foo')
|
| @@ -1097,14 +1153,13 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| start_date = datetime.datetime.strptime(results[2], date_format)
|
| end_date = datetime.datetime.strptime(results[3], date_format)
|
| self.assertEqual(end_date > start_date, True)
|
| - self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5
|
| self.assertEqual(int(results[6]), 3) # Source Size
|
| # Bytes transferred might be more than 3 if the file was gzipped, since
|
| # the minimum gzip header is 10 bytes.
|
| self.assertGreaterEqual(int(results[7]), 3) # Bytes Transferred
|
| self.assertEqual(results[8], 'OK') # Result
|
|
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_copy_unicode_non_ascii_filename(self):
|
| key_uri = self.CreateObject(contents='foo')
|
| # Make file large enough to cause a resumable upload (which hashes filename
|
| @@ -1122,6 +1177,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| # such files (so, failed that test). Given that, we decided to remove the
|
| # test.
|
|
|
| + @SequentialAndParallelTransfer
|
| def test_gzip_upload_and_download(self):
|
| bucket_uri = self.CreateBucket()
|
| contents = 'x' * 10000
|
| @@ -1158,6 +1214,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertIn('Copying file:', stderr)
|
| self.AssertNObjectsInBucket(bucket_uri, 1)
|
|
|
| + @SequentialAndParallelTransfer
|
| def test_cp_object_ending_with_slash(self):
|
| """Tests that cp works with object names ending with slash."""
|
| tmpdir = self.CreateTempDir()
|
| @@ -1214,6 +1271,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.RunGsUtil(['-m', 'cp', wildcard_uri, suri(bucket_uri)])
|
| self.AssertNObjectsInBucket(bucket_uri, num_test_files)
|
|
|
| + @SequentialAndParallelTransfer
|
| def test_cp_duplicate_source_args(self):
|
| """Tests that cp -m works when a source argument is provided twice."""
|
| object_contents = 'edge'
|
| @@ -1416,7 +1474,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| @NotParallelizable
|
| @SkipForS3('No resumable upload support for S3.')
|
| @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.')
|
| - @PerformsFileToObjectUpload
|
| + @SequentialAndParallelTransfer
|
| def test_cp_unwritable_tracker_file(self):
|
| """Tests a resumable upload with an unwritable tracker file."""
|
| bucket_uri = self.CreateBucket()
|
| @@ -1443,6 +1501,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| # interferes with any parallel running tests that use the tracker directory.
|
| @NotParallelizable
|
| @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.')
|
| + @SequentialAndParallelTransfer
|
| def test_cp_unwritable_tracker_file_download(self):
|
| """Tests downloads with an unwritable tracker file."""
|
| object_uri = self.CreateObject(contents='foo' * ONE_KIB)
|
| @@ -1491,6 +1550,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| return_stderr=True)
|
| self.assertIn('Resuming download', stderr)
|
|
|
| + @SequentialAndParallelTransfer
|
| def test_cp_resumable_download_etag_differs(self):
|
| """Tests that download restarts the file when the source object changes.
|
|
|
| @@ -1498,7 +1558,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| """
|
| bucket_uri = self.CreateBucket()
|
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
|
| - contents='a' * self.halt_size)
|
| + contents='abc' * self.halt_size)
|
| fpath = self.CreateTempFile()
|
| test_callback_file = self.CreateTempFile(
|
| contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
|
| @@ -1517,6 +1577,44 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| return_stderr=True)
|
| self.assertNotIn('Resuming download', stderr)
|
|
|
| + # TODO: Enable this test for sequential downloads when their tracker files are
|
| + # modified to contain the source object generation.
|
| + @unittest.skipUnless(UsingCrcmodExtension(crcmod),
|
| + 'Sliced download requires fast crcmod.')
|
| + @SkipForS3('No sliced download support for S3.')
|
| + def test_cp_resumable_download_generation_differs(self):
|
| + """Tests that a resumable download restarts if the generation differs."""
|
| + bucket_uri = self.CreateBucket()
|
| + file_contents = 'abcd' * self.halt_size
|
| + object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
|
| + contents=file_contents)
|
| + fpath = self.CreateTempFile()
|
| +
|
| + test_callback_file = self.CreateTempFile(
|
| + contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
|
| +
|
| + boto_config_for_test = [
|
| + ('GSUtil', 'resumable_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_max_components', '3')]
|
| +
|
| + with SetBotoConfigForTest(boto_config_for_test):
|
| + stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
|
| + suri(object_uri), suri(fpath)],
|
| + return_stderr=True, expected_status=1)
|
| + self.assertIn('Artifically halting download.', stderr)
|
| +
|
| + # Overwrite the object with an identical object, increasing
|
| + # the generation but leaving other metadata the same.
|
| + identical_file = self.CreateTempFile(contents=file_contents)
|
| + self.RunGsUtil(['cp', suri(identical_file), suri(object_uri)])
|
| +
|
| + stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath)],
|
| + return_stderr=True)
|
| + self.assertIn('Restarting download from scratch', stderr)
|
| + with open(fpath, 'r') as f:
|
| + self.assertEqual(f.read(), file_contents, 'File contents differ')
|
| +
|
| def test_cp_resumable_download_file_larger(self):
|
| """Tests download deletes the tracker file when existing file is larger."""
|
| bucket_uri = self.CreateBucket()
|
| @@ -1531,13 +1629,12 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| suri(object_uri), fpath],
|
| expected_status=1, return_stderr=True)
|
| self.assertIn('Artifically halting download.', stderr)
|
| - with open(fpath, 'w') as larger_file:
|
| + with open(fpath + '_.gstmp', 'w') as larger_file:
|
| for _ in range(self.halt_size * 2):
|
| larger_file.write('a')
|
| stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
|
| expected_status=1, return_stderr=True)
|
| self.assertNotIn('Resuming download', stderr)
|
| - self.assertIn('is larger', stderr)
|
| self.assertIn('Deleting tracker file', stderr)
|
|
|
| def test_cp_resumable_download_content_differs(self):
|
| @@ -1550,7 +1647,11 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| """
|
| bucket_uri = self.CreateBucket()
|
| tmp_dir = self.CreateTempDir()
|
| - fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB)
|
| + fpath = self.CreateTempFile(tmpdir=tmp_dir)
|
| + temp_download_file = fpath + '_.gstmp'
|
| + with open(temp_download_file, 'w') as fp:
|
| + fp.write('abcd' * ONE_KIB)
|
| +
|
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
|
| contents='efgh' * ONE_KIB)
|
| stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
|
| @@ -1569,11 +1670,13 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| with SetBotoConfigForTest([boto_config_for_test]):
|
| stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
|
| return_stderr=True, expected_status=1)
|
| - self.assertIn('Download already complete for file', stderr)
|
| + self.assertIn('Download already complete', stderr)
|
| self.assertIn('doesn\'t match cloud-supplied digest', stderr)
|
| # File and tracker file should be deleted.
|
| - self.assertFalse(os.path.isfile(fpath))
|
| + self.assertFalse(os.path.isfile(temp_download_file))
|
| self.assertFalse(os.path.isfile(tracker_filename))
|
| + # Permanent file should not have been created.
|
| + self.assertFalse(os.path.isfile(fpath))
|
| finally:
|
| if os.path.exists(tracker_filename):
|
| os.unlink(tracker_filename)
|
| @@ -1582,8 +1685,12 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| """Tests download no-ops when tracker file matches existing file."""
|
| bucket_uri = self.CreateBucket()
|
| tmp_dir = self.CreateTempDir()
|
| + fpath = self.CreateTempFile(tmpdir=tmp_dir)
|
| matching_contents = 'abcd' * ONE_KIB
|
| - fpath = self.CreateTempFile(tmpdir=tmp_dir, contents=matching_contents)
|
| + temp_download_file = fpath + '_.gstmp'
|
| + with open(temp_download_file, 'w') as fp:
|
| + fp.write(matching_contents)
|
| +
|
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
|
| contents=matching_contents)
|
| stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
|
| @@ -1601,7 +1708,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| with SetBotoConfigForTest([boto_config_for_test]):
|
| stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
|
| return_stderr=True)
|
| - self.assertIn('Download already complete for file', stderr)
|
| + self.assertIn('Download already complete', stderr)
|
| # Tracker file should be removed after successful hash validation.
|
| self.assertFalse(os.path.isfile(tracker_filename))
|
| finally:
|
| @@ -1643,6 +1750,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| if os.path.exists(tracker_filename):
|
| os.unlink(tracker_filename)
|
|
|
| + @SequentialAndParallelTransfer
|
| def test_cp_resumable_download_gzip(self):
|
| """Tests that download can be resumed successfully with a gzipped file."""
|
| # Generate some reasonably incompressible data. This compresses to a bit
|
| @@ -1683,12 +1791,26 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| suri(object_uri), suri(fpath2)],
|
| return_stderr=True, expected_status=1)
|
| self.assertIn('Artifically halting download.', stderr)
|
| - tracker_filename = GetTrackerFilePath(
|
| - StorageUrlFromString(fpath2), TrackerFileType.DOWNLOAD, self.test_api)
|
| - self.assertTrue(os.path.isfile(tracker_filename))
|
| self.assertIn('Downloading to temp gzip filename', stderr)
|
| +
|
| + # Tracker files will have different names depending on if we are
|
| + # downloading sequentially or in parallel.
|
| + sliced_download_threshold = HumanReadableToBytes(
|
| + boto.config.get('GSUtil', 'sliced_object_download_threshold',
|
| + DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD))
|
| + sliced_download = (len(contents) > sliced_download_threshold
|
| + and sliced_download_threshold > 0
|
| + and UsingCrcmodExtension(crcmod))
|
| + if sliced_download:
|
| + trackerfile_type = TrackerFileType.SLICED_DOWNLOAD
|
| + else:
|
| + trackerfile_type = TrackerFileType.DOWNLOAD
|
| + tracker_filename = GetTrackerFilePath(
|
| + StorageUrlFromString(fpath2), trackerfile_type, self.test_api)
|
| +
|
| # We should have a temporary gzipped file, a tracker file, and no
|
| # final file yet.
|
| + self.assertTrue(os.path.isfile(tracker_filename))
|
| self.assertTrue(os.path.isfile('%s_.gztmp' % fpath2))
|
| stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)],
|
| return_stderr=True)
|
| @@ -1698,6 +1820,31 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertFalse(os.path.isfile(tracker_filename))
|
| self.assertFalse(os.path.isfile('%s_.gztmp' % fpath2))
|
|
|
| + @SequentialAndParallelTransfer
|
| + def test_cp_resumable_download_check_hashes_never(self):
|
| + """Tests that resumble downloads work with check_hashes = never."""
|
| + bucket_uri = self.CreateBucket()
|
| + contents = 'abcd' * self.halt_size
|
| + object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
|
| + contents=contents)
|
| + fpath = self.CreateTempFile()
|
| + test_callback_file = self.CreateTempFile(
|
| + contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
|
| +
|
| + boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
|
| + ('GSUtil', 'check_hashes', 'never')]
|
| + with SetBotoConfigForTest(boto_config_for_test):
|
| + stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
|
| + suri(object_uri), fpath],
|
| + expected_status=1, return_stderr=True)
|
| + self.assertIn('Artifically halting download.', stderr)
|
| + stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
|
| + return_stderr=True)
|
| + self.assertIn('Resuming download', stderr)
|
| + self.assertIn('Found no hashes to validate object downloaded', stderr)
|
| + with open(fpath, 'r') as f:
|
| + self.assertEqual(f.read(), contents, 'File contents did not match.')
|
| +
|
| @SkipForS3('No resumable upload support for S3.')
|
| def test_cp_resumable_upload_bucket_deleted(self):
|
| """Tests that a not found exception is raised if bucket no longer exists."""
|
| @@ -1715,6 +1862,312 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
|
| self.assertIn('Deleting bucket', stderr)
|
| self.assertIn('bucket does not exist', stderr)
|
|
|
| + @SkipForS3('No sliced download support for S3.')
|
| + def test_cp_sliced_download(self):
|
| + """Tests that sliced object download works in the general case."""
|
| + bucket_uri = self.CreateBucket()
|
| + object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
|
| + contents='abc' * ONE_KIB)
|
| + fpath = self.CreateTempFile()
|
| +
|
| + # Force fast crcmod to return True to test the basic sliced download
|
| + # scenario, ensuring that if the user installs crcmod, it will work.
|
| + boto_config_for_test = [
|
| + ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
|
| + ('GSUtil', 'test_assume_fast_crcmod', 'True'),
|
| + ('GSUtil', 'sliced_object_download_threshold', str(ONE_KIB)),
|
| + ('GSUtil', 'sliced_object_download_max_components', '3')]
|
| +
|
| + with SetBotoConfigForTest(boto_config_for_test):
|
| + self.RunGsUtil(['cp', suri(object_uri), fpath])
|
| +
|
| + # Each tracker file should have been deleted.
|
| + tracker_filenames = GetSlicedDownloadTrackerFilePaths(
|
| + StorageUrlFromString(fpath), self.test_api)
|
| + for tracker_filename in tracker_filenames:
|
| + self.assertFalse(os.path.isfile(tracker_filename))
|
| +
|
| + with open(fpath, 'r') as f:
|
| + self.assertEqual(f.read(), 'abc' * ONE_KIB, 'File contents differ')
|
| +
|
| + @unittest.skipUnless(UsingCrcmodExtension(crcmod),
|
| + 'Sliced download requires fast crcmod.')
|
| + @SkipForS3('No sliced download support for S3.')
|
| + def test_cp_unresumable_sliced_download(self):
|
| + """Tests sliced download works when resumability is disabled."""
|
| + bucket_uri = self.CreateBucket()
|
| + object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
|
| + contents='abcd' * self.halt_size)
|
| + fpath = self.CreateTempFile()
|
| + test_callback_file = self.CreateTempFile(
|
| + contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
|
| +
|
| + boto_config_for_test = [
|
| + ('GSUtil', 'resumable_threshold', str(self.halt_size*5)),
|
| + ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_max_components', '4')]
|
| +
|
| + with SetBotoConfigForTest(boto_config_for_test):
|
| + stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
|
| + suri(object_uri), suri(fpath)],
|
| + return_stderr=True, expected_status=1)
|
| + self.assertIn('not downloaded successfully', stderr)
|
| + # Temporary download file should exist.
|
| + self.assertTrue(os.path.isfile(fpath + '_.gstmp'))
|
| +
|
| + # No tracker files should exist.
|
| + tracker_filenames = GetSlicedDownloadTrackerFilePaths(
|
| + StorageUrlFromString(fpath), self.test_api)
|
| + for tracker_filename in tracker_filenames:
|
| + self.assertFalse(os.path.isfile(tracker_filename))
|
| +
|
| + # Perform the entire download, without resuming.
|
| + with SetBotoConfigForTest(boto_config_for_test):
|
| + stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath)],
|
| + return_stderr=True)
|
| + self.assertNotIn('Resuming download', stderr)
|
| + # Temporary download file should have been deleted.
|
| + self.assertFalse(os.path.isfile(fpath + '_.gstmp'))
|
| + with open(fpath, 'r') as f:
|
| + self.assertEqual(f.read(), 'abcd' * self.halt_size,
|
| + 'File contents differ')
|
| +
|
| + @unittest.skipUnless(UsingCrcmodExtension(crcmod),
|
| + 'Sliced download requires fast crcmod.')
|
| + @SkipForS3('No sliced download support for S3.')
|
| + def test_cp_sliced_download_resume(self):
|
| + """Tests that sliced object download is resumable."""
|
| + bucket_uri = self.CreateBucket()
|
| + object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
|
| + contents='abc' * self.halt_size)
|
| + fpath = self.CreateTempFile()
|
| + test_callback_file = self.CreateTempFile(
|
| + contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
|
| +
|
| + boto_config_for_test = [
|
| + ('GSUtil', 'resumable_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_max_components', '3')]
|
| +
|
| + with SetBotoConfigForTest(boto_config_for_test):
|
| + stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
|
| + suri(object_uri), suri(fpath)],
|
| + return_stderr=True, expected_status=1)
|
| + self.assertIn('not downloaded successfully', stderr)
|
| +
|
| + # Each tracker file should exist.
|
| + tracker_filenames = GetSlicedDownloadTrackerFilePaths(
|
| + StorageUrlFromString(fpath), self.test_api)
|
| + for tracker_filename in tracker_filenames:
|
| + self.assertTrue(os.path.isfile(tracker_filename))
|
| +
|
| + stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
|
| + return_stderr=True)
|
| + self.assertIn('Resuming download', stderr)
|
| +
|
| + # Each tracker file should have been deleted.
|
| + tracker_filenames = GetSlicedDownloadTrackerFilePaths(
|
| + StorageUrlFromString(fpath), self.test_api)
|
| + for tracker_filename in tracker_filenames:
|
| + self.assertFalse(os.path.isfile(tracker_filename))
|
| +
|
| + with open(fpath, 'r') as f:
|
| + self.assertEqual(f.read(), 'abc' * self.halt_size,
|
| + 'File contents differ')
|
| +
|
| + @unittest.skipUnless(UsingCrcmodExtension(crcmod),
|
| + 'Sliced download requires fast crcmod.')
|
| + @SkipForS3('No sliced download support for S3.')
|
| + def test_cp_sliced_download_partial_resume(self):
|
| + """Test sliced download resumability when some components are finished."""
|
| + bucket_uri = self.CreateBucket()
|
| + object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
|
| + contents='abc' * self.halt_size)
|
| + fpath = self.CreateTempFile()
|
| + test_callback_file = self.CreateTempFile(
|
| + contents=pickle.dumps(_HaltOneComponentCopyCallbackHandler(5)))
|
| +
|
| + boto_config_for_test = [
|
| + ('GSUtil', 'resumable_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_max_components', '3')]
|
| +
|
| + with SetBotoConfigForTest(boto_config_for_test):
|
| + stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
|
| + suri(object_uri), suri(fpath)],
|
| + return_stderr=True, expected_status=1)
|
| + self.assertIn('not downloaded successfully', stderr)
|
| +
|
| + # Each tracker file should exist.
|
| + tracker_filenames = GetSlicedDownloadTrackerFilePaths(
|
| + StorageUrlFromString(fpath), self.test_api)
|
| + for tracker_filename in tracker_filenames:
|
| + self.assertTrue(os.path.isfile(tracker_filename))
|
| +
|
| + stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
|
| + return_stderr=True)
|
| + self.assertIn('Resuming download', stderr)
|
| + self.assertIn('Download already complete', stderr)
|
| +
|
| + # Each tracker file should have been deleted.
|
| + tracker_filenames = GetSlicedDownloadTrackerFilePaths(
|
| + StorageUrlFromString(fpath), self.test_api)
|
| + for tracker_filename in tracker_filenames:
|
| + self.assertFalse(os.path.isfile(tracker_filename))
|
| +
|
| + with open(fpath, 'r') as f:
|
| + self.assertEqual(f.read(), 'abc' * self.halt_size,
|
| + 'File contents differ')
|
| +
|
| + @unittest.skipUnless(UsingCrcmodExtension(crcmod),
|
| + 'Sliced download requires fast crcmod.')
|
| + @SkipForS3('No sliced download support for S3.')
|
| + def test_cp_sliced_download_resume_content_differs(self):
|
| + """Tests differing file contents are detected by sliced downloads."""
|
| + bucket_uri = self.CreateBucket()
|
| + object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
|
| + contents='abc' * self.halt_size)
|
| + fpath = self.CreateTempFile(contents='')
|
| + test_callback_file = self.CreateTempFile(
|
| + contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
|
| +
|
| + boto_config_for_test = [
|
| + ('GSUtil', 'resumable_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_max_components', '3')]
|
| +
|
| + with SetBotoConfigForTest(boto_config_for_test):
|
| + stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
|
| + suri(object_uri), suri(fpath)],
|
| + return_stderr=True, expected_status=1)
|
| + self.assertIn('not downloaded successfully', stderr)
|
| +
|
| + # Temporary download file should exist.
|
| + self.assertTrue(os.path.isfile(fpath + '_.gstmp'))
|
| +
|
| + # Each tracker file should exist.
|
| + tracker_filenames = GetSlicedDownloadTrackerFilePaths(
|
| + StorageUrlFromString(fpath), self.test_api)
|
| + for tracker_filename in tracker_filenames:
|
| + self.assertTrue(os.path.isfile(tracker_filename))
|
| +
|
| + with open(fpath + '_.gstmp', 'r+b') as f:
|
| + f.write('altered file contents')
|
| +
|
| + stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
|
| + return_stderr=True, expected_status=1)
|
| + self.assertIn('Resuming download', stderr)
|
| + self.assertIn('doesn\'t match cloud-supplied digest', stderr)
|
| + self.assertIn('HashMismatchException: crc32c', stderr)
|
| +
|
| + # Each tracker file should have been deleted.
|
| + tracker_filenames = GetSlicedDownloadTrackerFilePaths(
|
| + StorageUrlFromString(fpath), self.test_api)
|
| + for tracker_filename in tracker_filenames:
|
| + self.assertFalse(os.path.isfile(tracker_filename))
|
| +
|
| + # Temporary file should have been deleted due to hash mismatch.
|
| + self.assertFalse(os.path.isfile(fpath + '_.gstmp'))
|
| + # Final file should not exist.
|
| + self.assertFalse(os.path.isfile(fpath))
|
| +
|
| + @unittest.skipUnless(UsingCrcmodExtension(crcmod),
|
| + 'Sliced download requires fast crcmod.')
|
| + @SkipForS3('No sliced download support for S3.')
|
| + def test_cp_sliced_download_component_size_changed(self):
|
| + """Tests sliced download doesn't break when the boto config changes.
|
| +
|
| + If the number of components used changes cross-process, the download should
|
| + be restarted.
|
| + """
|
| + bucket_uri = self.CreateBucket()
|
| + object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
|
| + contents='abcd' * self.halt_size)
|
| + fpath = self.CreateTempFile()
|
| + test_callback_file = self.CreateTempFile(
|
| + contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
|
| +
|
| + boto_config_for_test = [
|
| + ('GSUtil', 'resumable_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_component_size',
|
| + str(self.halt_size//4)),
|
| + ('GSUtil', 'sliced_object_download_max_components', '4')]
|
| +
|
| + with SetBotoConfigForTest(boto_config_for_test):
|
| + stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
|
| + suri(object_uri), suri(fpath)],
|
| + return_stderr=True, expected_status=1)
|
| + self.assertIn('not downloaded successfully', stderr)
|
| +
|
| + boto_config_for_test = [
|
| + ('GSUtil', 'resumable_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_component_size',
|
| + str(self.halt_size//2)),
|
| + ('GSUtil', 'sliced_object_download_max_components', '2')]
|
| +
|
| + with SetBotoConfigForTest(boto_config_for_test):
|
| + stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
|
| + return_stderr=True)
|
| + self.assertIn('Sliced download tracker file doesn\'t match ', stderr)
|
| + self.assertIn('Restarting download from scratch', stderr)
|
| + self.assertNotIn('Resuming download', stderr)
|
| +
|
| + @unittest.skipUnless(UsingCrcmodExtension(crcmod),
|
| + 'Sliced download requires fast crcmod.')
|
| + @SkipForS3('No sliced download support for S3.')
|
| + def test_cp_sliced_download_disabled_cross_process(self):
|
| + """Tests temporary files are not orphaned if sliced download is disabled.
|
| +
|
| + Specifically, temporary files should be deleted when the corresponding
|
| + non-sliced download is completed.
|
| + """
|
| + bucket_uri = self.CreateBucket()
|
| + object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
|
| + contents='abcd' * self.halt_size)
|
| + fpath = self.CreateTempFile()
|
| + test_callback_file = self.CreateTempFile(
|
| + contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
|
| +
|
| + boto_config_for_test = [
|
| + ('GSUtil', 'resumable_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_max_components', '4')]
|
| +
|
| + with SetBotoConfigForTest(boto_config_for_test):
|
| + stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
|
| + suri(object_uri), suri(fpath)],
|
| + return_stderr=True, expected_status=1)
|
| + self.assertIn('not downloaded successfully', stderr)
|
| + # Temporary download file should exist.
|
| + self.assertTrue(os.path.isfile(fpath + '_.gstmp'))
|
| +
|
| + # Each tracker file should exist.
|
| + tracker_filenames = GetSlicedDownloadTrackerFilePaths(
|
| + StorageUrlFromString(fpath), self.test_api)
|
| + for tracker_filename in tracker_filenames:
|
| + self.assertTrue(os.path.isfile(tracker_filename))
|
| +
|
| + # Disable sliced downloads by increasing the threshold
|
| + boto_config_for_test = [
|
| + ('GSUtil', 'resumable_threshold', str(self.halt_size)),
|
| + ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size*5)),
|
| + ('GSUtil', 'sliced_object_download_max_components', '4')]
|
| +
|
| + with SetBotoConfigForTest(boto_config_for_test):
|
| + stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
|
| + return_stderr=True)
|
| + self.assertNotIn('Resuming download', stderr)
|
| + # Temporary download file should have been deleted.
|
| + self.assertFalse(os.path.isfile(fpath + '_.gstmp'))
|
| +
|
| + # Each tracker file should have been deleted.
|
| + for tracker_filename in tracker_filenames:
|
| + self.assertFalse(os.path.isfile(tracker_filename))
|
| + with open(fpath, 'r') as f:
|
| + self.assertEqual(f.read(), 'abcd' * self.halt_size)
|
| +
|
| @SkipForS3('No resumable upload support for S3.')
|
| def test_cp_resumable_upload_start_over_http_error(self):
|
| for start_over_error in (404, 410):
|
|
|