Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(204)

Unified Diff: tools/telemetry/third_party/gsutilz/gslib/tests/test_cp.py

Issue 1376593003: Roll gsutil version to 4.15. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutilz/gslib/tests/test_cp.py
diff --git a/tools/telemetry/third_party/gsutilz/gslib/tests/test_cp.py b/tools/telemetry/third_party/gsutilz/gslib/tests/test_cp.py
index 7c44c366a614f71eb3c020660c5bf6876e0764b9..c216fb64a6a0345bfbf5f9e0b5ee8030ce1bec63 100644
--- a/tools/telemetry/third_party/gsutilz/gslib/tests/test_cp.py
+++ b/tools/telemetry/third_party/gsutilz/gslib/tests/test_cp.py
@@ -33,13 +33,14 @@ from apitools.base.py import exceptions as apitools_exceptions
import boto
from boto import storage_uri
from boto.exception import ResumableTransferDisposition
-from boto.exception import ResumableUploadException
from boto.exception import StorageResponseError
from boto.storage_uri import BucketStorageUri
+import crcmod
from gslib.cloud_api import ResumableDownloadException
from gslib.cloud_api import ResumableUploadException
from gslib.cloud_api import ResumableUploadStartOverException
+from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD
from gslib.copy_helper import GetTrackerFilePath
from gslib.copy_helper import TrackerFileType
from gslib.cs_api_map import ApiSelector
@@ -52,19 +53,22 @@ from gslib.tests.testcase.integration_testcase import SkipForS3
from gslib.tests.util import GenerationFromURI as urigen
from gslib.tests.util import HAS_S3_CREDS
from gslib.tests.util import ObjectToURI as suri
-from gslib.tests.util import PerformsFileToObjectUpload
+from gslib.tests.util import SequentialAndParallelTransfer
from gslib.tests.util import SetBotoConfigForTest
from gslib.tests.util import unittest
from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
from gslib.tracker_file import DeleteTrackerFile
from gslib.tracker_file import GetRewriteTrackerFilePath
+from gslib.tracker_file import GetSlicedDownloadTrackerFilePaths
from gslib.util import EIGHT_MIB
+from gslib.util import HumanReadableToBytes
from gslib.util import IS_WINDOWS
from gslib.util import MakeHumanReadable
from gslib.util import ONE_KIB
from gslib.util import ONE_MIB
from gslib.util import Retry
from gslib.util import START_CALLBACK_PER_BYTES
+from gslib.util import UsingCrcmodExtension
from gslib.util import UTF8
@@ -139,6 +143,24 @@ class _XMLResumableUploadStartOverCopyCallbackHandler(object):
ResumableTransferDisposition.START_OVER)
+class _HaltOneComponentCopyCallbackHandler(object):
+ """Test callback handler for stopping part of a sliced download."""
+
+ def __init__(self, halt_at_byte):
+ self._last_progress_byte = None
+ self._halt_at_byte = halt_at_byte
+
+ # pylint: disable=invalid-name
+ # pylint: disable=unused-argument
+ def call(self, current_progress_byte, total_size_unused):
+ """Forcibly exits if the passed the halting point since the last call."""
+ if (self._last_progress_byte is not None and
+ self._last_progress_byte < self._halt_at_byte < current_progress_byte):
+ sys.stderr.write('Halting transfer.\r\n')
+ raise ResumableDownloadException('Artifically halting download.')
+ self._last_progress_byte = current_progress_byte
+
+
class _DeleteBucketThenStartOverCopyCallbackHandler(object):
"""Test callback handler that deletes bucket then raises start-over."""
@@ -239,7 +261,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
contents = pkgutil.get_data('gslib', 'tests/test_data/%s' % name)
return self.CreateTempFile(file_name=name, contents=contents)
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_noclobber(self):
key_uri = self.CreateObject(contents='foo')
fpath = self.CreateTempFile(contents='bar')
@@ -259,7 +281,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
'%s://%s' % (self.default_provider, self.nonexistent_bucket_name))
stderr = self.RunGsUtil(['cp', fpath, invalid_bucket_uri],
expected_status=1, return_stderr=True)
- self.assertIn('does not exist.', stderr)
+ self.assertIn('does not exist', stderr)
def test_copy_in_cloud_noclobber(self):
bucket1_uri = self.CreateBucket()
@@ -275,7 +297,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertIn('Skipping existing item: %s' %
suri(bucket2_uri, key_uri.object_name), stderr)
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_streaming(self):
bucket_uri = self.CreateBucket()
stderr = self.RunGsUtil(['cp', '-', '%s' % suri(bucket_uri, 'foo')],
@@ -293,7 +315,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
# TODO: Implement a way to test both with and without using magic file.
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_detect_content_type(self):
"""Tests local detection of content type."""
bucket_uri = self.CreateBucket()
@@ -375,7 +397,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
_Check2()
@unittest.skipIf(IS_WINDOWS, 'magicfile is not available on Windows.')
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_magicfile_override(self):
"""Tests content type override with magicfile value."""
bucket_uri = self.CreateBucket()
@@ -393,7 +415,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertRegexpMatches(stdout, r'Content-Type:\s+%s' % content_type)
_Check1()
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_content_type_mismatches(self):
"""Tests overriding content type when it does not match the file type."""
bucket_uri = self.CreateBucket()
@@ -429,7 +451,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
_Check3()
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_content_type_header_case_insensitive(self):
"""Tests that content type header is treated with case insensitivity."""
bucket_uri = self.CreateBucket()
@@ -459,7 +481,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertNotRegexpMatches(stdout, r'image/gif,\s*image/gif')
_Check2()
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_other_headers(self):
"""Tests that non-content-type headers are applied successfully on copy."""
bucket_uri = self.CreateBucket()
@@ -481,7 +503,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12')
self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd')
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_versioning(self):
"""Tests copy with versioning."""
bucket_uri = self.CreateVersionedBucket()
@@ -524,11 +546,20 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
expected_status=1)
self.assertIn('cannot be the destination for gsutil cp', stderr)
+ def test_versioning_no_parallelism(self):
+ """Tests that copy all-versions errors when parallelism is enabled."""
+ stderr = self.RunGsUtil(
+ ['-m', 'cp', '-A', suri(self.nonexistent_bucket_name, 'foo'),
+ suri(self.nonexistent_bucket_name, 'bar')],
+ expected_status=1, return_stderr=True)
+ self.assertIn('-m option is not supported with the cp -A flag', stderr)
+
@SkipForS3('S3 lists versioned objects in reverse timestamp order.')
def test_recursive_copying_versioned_bucket(self):
- """Tests that cp -R with versioned buckets copies all versions in order."""
+ """Tests cp -R with versioned buckets."""
bucket1_uri = self.CreateVersionedBucket()
bucket2_uri = self.CreateVersionedBucket()
+ bucket3_uri = self.CreateVersionedBucket()
# Write two versions of an object to the bucket1.
self.CreateObject(bucket_uri=bucket1_uri, object_name='k', contents='data0')
@@ -537,9 +568,12 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.AssertNObjectsInBucket(bucket1_uri, 2, versioned=True)
self.AssertNObjectsInBucket(bucket2_uri, 0, versioned=True)
+ self.AssertNObjectsInBucket(bucket3_uri, 0, versioned=True)
# Recursively copy to second versioned bucket.
- self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'), suri(bucket2_uri)])
+ # -A flag should copy all versions in order.
+ self.RunGsUtil(['cp', '-R', '-A', suri(bucket1_uri, '*'),
+ suri(bucket2_uri)])
# Use @Retry as hedge against bucket listing eventual consistency.
@Retry(AssertionError, tries=3, timeout_secs=1)
@@ -570,7 +604,31 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertEquals(storage_uri(uri_str2).object_name, 'k')
_Check2()
- @PerformsFileToObjectUpload
+ # Recursively copy to second versioned bucket with no -A flag.
+ # This should copy only the live object.
+ self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'),
+ suri(bucket3_uri)])
+
+ # Use @Retry as hedge against bucket listing eventual consistency.
+ @Retry(AssertionError, tries=3, timeout_secs=1)
+ def _Check3():
+ """Validates the results of the cp -R."""
+ listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)],
+ return_stdout=True).split('\n')
+ listing2 = self.RunGsUtil(['ls', '-la', suri(bucket3_uri)],
+ return_stdout=True).split('\n')
+ # 2 lines of listing output, 1 summary line, 1 empty line from \n split.
+ self.assertEquals(len(listing1), 4)
+ # 1 lines of listing output, 1 summary line, 1 empty line from \n split.
+ self.assertEquals(len(listing2), 3)
+
+ # Live (second) object in bucket 1 should match the single live object.
+ size1, _, uri_str1, _ = listing2[0].split()
+ self.assertEquals(size1, str(len('longer_data1')))
+ self.assertEquals(storage_uri(uri_str1).object_name, 'k')
+ _Check3()
+
+ @SequentialAndParallelTransfer
@SkipForS3('Preconditions not supported for S3.')
def test_cp_generation_zero_match(self):
"""Tests that cp handles an object-not-exists precondition header."""
@@ -591,7 +649,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
return_stderr=True, expected_status=1)
self.assertIn('PreconditionException', stderr)
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
@SkipForS3('Preconditions not supported for S3.')
def test_cp_v_generation_match(self):
"""Tests that cp -v option handles the if-generation-match header."""
@@ -623,7 +681,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertIn('Specifying x-goog-if-generation-match is not supported '
'with cp -n', stderr)
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_cp_nv(self):
"""Tests that cp -nv works when skipping existing file."""
bucket_uri = self.CreateVersionedBucket()
@@ -640,7 +698,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
return_stderr=True)
self.assertIn('Skipping existing item:', stderr)
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
@SkipForS3('S3 lists versioned objects in reverse timestamp order.')
def test_cp_v_option(self):
""""Tests that cp -v returns the created object's version-specific URI."""
@@ -699,7 +757,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertEqual(created_uri, lines[-2])
_Check1()
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_stdin_args(self):
"""Tests cp with the -I option."""
tmpdir = self.CreateTempDir()
@@ -844,7 +902,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertEqual(public_read_acl, new_acl_json)
_Check()
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_canned_acl_upload(self):
"""Tests uploading a file with a canned ACL."""
bucket1_uri = self.CreateBucket()
@@ -889,7 +947,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
stdout = self.RunGsUtil(['cp', fpath, '-'], return_stdout=True)
self.assertIn(contents, stdout)
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_cp_zero_byte_file(self):
dst_bucket_uri = self.CreateBucket()
src_dir = self.CreateTempDir()
@@ -909,7 +967,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertTrue(os.stat(download_path))
def test_copy_bucket_to_bucket(self):
- """Tests that recursively copying from bucket to bucket.
+ """Tests recursively copying from bucket to bucket.
This should produce identically named objects (and not, in particular,
destination objects named by the version-specific URI from source objects).
@@ -978,9 +1036,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
key_uri.set_contents_from_string('')
self.AssertNObjectsInBucket(src_bucket_uri, 3)
- stderr = self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir],
- return_stderr=True)
- self.assertIn('Skipping cloud sub-directory placeholder object', stderr)
+ self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir])
dir_list = []
for dirname, _, filenames in os.walk(dst_dir):
for filename in filenames:
@@ -1025,12 +1081,12 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
@unittest.skipIf(IS_WINDOWS,
'Unicode handling on Windows requires mods to site-packages')
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_cp_manifest_upload_unicode(self):
return self._ManifestUpload('foo-unicöde', 'bar-unicöde',
'manifest-unicöde')
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_cp_manifest_upload(self):
"""Tests uploading with a mnifest file."""
return self._ManifestUpload('foo', 'bar', 'manifest')
@@ -1071,7 +1127,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertEqual(int(results[7]), 3) # Bytes Transferred
self.assertEqual(results[8], 'OK') # Result
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_cp_manifest_download(self):
"""Tests downloading with a manifest file."""
key_uri = self.CreateObject(contents='foo')
@@ -1097,14 +1153,13 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
start_date = datetime.datetime.strptime(results[2], date_format)
end_date = datetime.datetime.strptime(results[3], date_format)
self.assertEqual(end_date > start_date, True)
- self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5
self.assertEqual(int(results[6]), 3) # Source Size
# Bytes transferred might be more than 3 if the file was gzipped, since
# the minimum gzip header is 10 bytes.
self.assertGreaterEqual(int(results[7]), 3) # Bytes Transferred
self.assertEqual(results[8], 'OK') # Result
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_copy_unicode_non_ascii_filename(self):
key_uri = self.CreateObject(contents='foo')
# Make file large enough to cause a resumable upload (which hashes filename
@@ -1122,6 +1177,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
# such files (so, failed that test). Given that, we decided to remove the
# test.
+ @SequentialAndParallelTransfer
def test_gzip_upload_and_download(self):
bucket_uri = self.CreateBucket()
contents = 'x' * 10000
@@ -1158,6 +1214,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertIn('Copying file:', stderr)
self.AssertNObjectsInBucket(bucket_uri, 1)
+ @SequentialAndParallelTransfer
def test_cp_object_ending_with_slash(self):
"""Tests that cp works with object names ending with slash."""
tmpdir = self.CreateTempDir()
@@ -1214,6 +1271,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.RunGsUtil(['-m', 'cp', wildcard_uri, suri(bucket_uri)])
self.AssertNObjectsInBucket(bucket_uri, num_test_files)
+ @SequentialAndParallelTransfer
def test_cp_duplicate_source_args(self):
"""Tests that cp -m works when a source argument is provided twice."""
object_contents = 'edge'
@@ -1416,7 +1474,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
@NotParallelizable
@SkipForS3('No resumable upload support for S3.')
@unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.')
- @PerformsFileToObjectUpload
+ @SequentialAndParallelTransfer
def test_cp_unwritable_tracker_file(self):
"""Tests a resumable upload with an unwritable tracker file."""
bucket_uri = self.CreateBucket()
@@ -1443,6 +1501,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
# interferes with any parallel running tests that use the tracker directory.
@NotParallelizable
@unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.')
+ @SequentialAndParallelTransfer
def test_cp_unwritable_tracker_file_download(self):
"""Tests downloads with an unwritable tracker file."""
object_uri = self.CreateObject(contents='foo' * ONE_KIB)
@@ -1491,6 +1550,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
return_stderr=True)
self.assertIn('Resuming download', stderr)
+ @SequentialAndParallelTransfer
def test_cp_resumable_download_etag_differs(self):
"""Tests that download restarts the file when the source object changes.
@@ -1498,7 +1558,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
"""
bucket_uri = self.CreateBucket()
object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
- contents='a' * self.halt_size)
+ contents='abc' * self.halt_size)
fpath = self.CreateTempFile()
test_callback_file = self.CreateTempFile(
contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
@@ -1517,6 +1577,44 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
return_stderr=True)
self.assertNotIn('Resuming download', stderr)
+ # TODO: Enable this test for sequential downloads when their tracker files are
+ # modified to contain the source object generation.
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod),
+ 'Sliced download requires fast crcmod.')
+ @SkipForS3('No sliced download support for S3.')
+ def test_cp_resumable_download_generation_differs(self):
+ """Tests that a resumable download restarts if the generation differs."""
+ bucket_uri = self.CreateBucket()
+ file_contents = 'abcd' * self.halt_size
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
+ contents=file_contents)
+ fpath = self.CreateTempFile()
+
+ test_callback_file = self.CreateTempFile(
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
+
+ boto_config_for_test = [
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_max_components', '3')]
+
+ with SetBotoConfigForTest(boto_config_for_test):
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
+ suri(object_uri), suri(fpath)],
+ return_stderr=True, expected_status=1)
+ self.assertIn('Artifically halting download.', stderr)
+
+ # Overwrite the object with an identical object, increasing
+ # the generation but leaving other metadata the same.
+ identical_file = self.CreateTempFile(contents=file_contents)
+ self.RunGsUtil(['cp', suri(identical_file), suri(object_uri)])
+
+ stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath)],
+ return_stderr=True)
+ self.assertIn('Restarting download from scratch', stderr)
+ with open(fpath, 'r') as f:
+ self.assertEqual(f.read(), file_contents, 'File contents differ')
+
def test_cp_resumable_download_file_larger(self):
"""Tests download deletes the tracker file when existing file is larger."""
bucket_uri = self.CreateBucket()
@@ -1531,13 +1629,12 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
suri(object_uri), fpath],
expected_status=1, return_stderr=True)
self.assertIn('Artifically halting download.', stderr)
- with open(fpath, 'w') as larger_file:
+ with open(fpath + '_.gstmp', 'w') as larger_file:
for _ in range(self.halt_size * 2):
larger_file.write('a')
stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
expected_status=1, return_stderr=True)
self.assertNotIn('Resuming download', stderr)
- self.assertIn('is larger', stderr)
self.assertIn('Deleting tracker file', stderr)
def test_cp_resumable_download_content_differs(self):
@@ -1550,7 +1647,11 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
"""
bucket_uri = self.CreateBucket()
tmp_dir = self.CreateTempDir()
- fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB)
+ fpath = self.CreateTempFile(tmpdir=tmp_dir)
+ temp_download_file = fpath + '_.gstmp'
+ with open(temp_download_file, 'w') as fp:
+ fp.write('abcd' * ONE_KIB)
+
object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
contents='efgh' * ONE_KIB)
stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
@@ -1569,11 +1670,13 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
with SetBotoConfigForTest([boto_config_for_test]):
stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
return_stderr=True, expected_status=1)
- self.assertIn('Download already complete for file', stderr)
+ self.assertIn('Download already complete', stderr)
self.assertIn('doesn\'t match cloud-supplied digest', stderr)
# File and tracker file should be deleted.
- self.assertFalse(os.path.isfile(fpath))
+ self.assertFalse(os.path.isfile(temp_download_file))
self.assertFalse(os.path.isfile(tracker_filename))
+ # Permanent file should not have been created.
+ self.assertFalse(os.path.isfile(fpath))
finally:
if os.path.exists(tracker_filename):
os.unlink(tracker_filename)
@@ -1582,8 +1685,12 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
"""Tests download no-ops when tracker file matches existing file."""
bucket_uri = self.CreateBucket()
tmp_dir = self.CreateTempDir()
+ fpath = self.CreateTempFile(tmpdir=tmp_dir)
matching_contents = 'abcd' * ONE_KIB
- fpath = self.CreateTempFile(tmpdir=tmp_dir, contents=matching_contents)
+ temp_download_file = fpath + '_.gstmp'
+ with open(temp_download_file, 'w') as fp:
+ fp.write(matching_contents)
+
object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
contents=matching_contents)
stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
@@ -1601,7 +1708,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
with SetBotoConfigForTest([boto_config_for_test]):
stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
return_stderr=True)
- self.assertIn('Download already complete for file', stderr)
+ self.assertIn('Download already complete', stderr)
# Tracker file should be removed after successful hash validation.
self.assertFalse(os.path.isfile(tracker_filename))
finally:
@@ -1643,6 +1750,7 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
if os.path.exists(tracker_filename):
os.unlink(tracker_filename)
+ @SequentialAndParallelTransfer
def test_cp_resumable_download_gzip(self):
"""Tests that download can be resumed successfully with a gzipped file."""
# Generate some reasonably incompressible data. This compresses to a bit
@@ -1683,12 +1791,26 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
suri(object_uri), suri(fpath2)],
return_stderr=True, expected_status=1)
self.assertIn('Artifically halting download.', stderr)
- tracker_filename = GetTrackerFilePath(
- StorageUrlFromString(fpath2), TrackerFileType.DOWNLOAD, self.test_api)
- self.assertTrue(os.path.isfile(tracker_filename))
self.assertIn('Downloading to temp gzip filename', stderr)
+
+ # Tracker files will have different names depending on if we are
+ # downloading sequentially or in parallel.
+ sliced_download_threshold = HumanReadableToBytes(
+ boto.config.get('GSUtil', 'sliced_object_download_threshold',
+ DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD))
+ sliced_download = (len(contents) > sliced_download_threshold
+ and sliced_download_threshold > 0
+ and UsingCrcmodExtension(crcmod))
+ if sliced_download:
+ trackerfile_type = TrackerFileType.SLICED_DOWNLOAD
+ else:
+ trackerfile_type = TrackerFileType.DOWNLOAD
+ tracker_filename = GetTrackerFilePath(
+ StorageUrlFromString(fpath2), trackerfile_type, self.test_api)
+
# We should have a temporary gzipped file, a tracker file, and no
# final file yet.
+ self.assertTrue(os.path.isfile(tracker_filename))
self.assertTrue(os.path.isfile('%s_.gztmp' % fpath2))
stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)],
return_stderr=True)
@@ -1698,6 +1820,31 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertFalse(os.path.isfile(tracker_filename))
self.assertFalse(os.path.isfile('%s_.gztmp' % fpath2))
+ @SequentialAndParallelTransfer
+ def test_cp_resumable_download_check_hashes_never(self):
+ """Tests that resumble downloads work with check_hashes = never."""
+ bucket_uri = self.CreateBucket()
+ contents = 'abcd' * self.halt_size
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
+ contents=contents)
+ fpath = self.CreateTempFile()
+ test_callback_file = self.CreateTempFile(
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
+
+ boto_config_for_test = [('GSUtil', 'resumable_threshold', str(ONE_KIB)),
+ ('GSUtil', 'check_hashes', 'never')]
+ with SetBotoConfigForTest(boto_config_for_test):
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
+ suri(object_uri), fpath],
+ expected_status=1, return_stderr=True)
+ self.assertIn('Artifically halting download.', stderr)
+ stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
+ return_stderr=True)
+ self.assertIn('Resuming download', stderr)
+ self.assertIn('Found no hashes to validate object downloaded', stderr)
+ with open(fpath, 'r') as f:
+ self.assertEqual(f.read(), contents, 'File contents did not match.')
+
@SkipForS3('No resumable upload support for S3.')
def test_cp_resumable_upload_bucket_deleted(self):
"""Tests that a not found exception is raised if bucket no longer exists."""
@@ -1715,6 +1862,312 @@ class TestCp(testcase.GsUtilIntegrationTestCase):
self.assertIn('Deleting bucket', stderr)
self.assertIn('bucket does not exist', stderr)
+ @SkipForS3('No sliced download support for S3.')
+ def test_cp_sliced_download(self):
+ """Tests that sliced object download works in the general case."""
+ bucket_uri = self.CreateBucket()
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
+ contents='abc' * ONE_KIB)
+ fpath = self.CreateTempFile()
+
+ # Force fast crcmod to return True to test the basic sliced download
+ # scenario, ensuring that if the user installs crcmod, it will work.
+ boto_config_for_test = [
+ ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
+ ('GSUtil', 'test_assume_fast_crcmod', 'True'),
+ ('GSUtil', 'sliced_object_download_threshold', str(ONE_KIB)),
+ ('GSUtil', 'sliced_object_download_max_components', '3')]
+
+ with SetBotoConfigForTest(boto_config_for_test):
+ self.RunGsUtil(['cp', suri(object_uri), fpath])
+
+ # Each tracker file should have been deleted.
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths(
+ StorageUrlFromString(fpath), self.test_api)
+ for tracker_filename in tracker_filenames:
+ self.assertFalse(os.path.isfile(tracker_filename))
+
+ with open(fpath, 'r') as f:
+ self.assertEqual(f.read(), 'abc' * ONE_KIB, 'File contents differ')
+
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod),
+ 'Sliced download requires fast crcmod.')
+ @SkipForS3('No sliced download support for S3.')
+ def test_cp_unresumable_sliced_download(self):
+ """Tests sliced download works when resumability is disabled."""
+ bucket_uri = self.CreateBucket()
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
+ contents='abcd' * self.halt_size)
+ fpath = self.CreateTempFile()
+ test_callback_file = self.CreateTempFile(
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
+
+ boto_config_for_test = [
+ ('GSUtil', 'resumable_threshold', str(self.halt_size*5)),
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_max_components', '4')]
+
+ with SetBotoConfigForTest(boto_config_for_test):
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
+ suri(object_uri), suri(fpath)],
+ return_stderr=True, expected_status=1)
+ self.assertIn('not downloaded successfully', stderr)
+ # Temporary download file should exist.
+ self.assertTrue(os.path.isfile(fpath + '_.gstmp'))
+
+ # No tracker files should exist.
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths(
+ StorageUrlFromString(fpath), self.test_api)
+ for tracker_filename in tracker_filenames:
+ self.assertFalse(os.path.isfile(tracker_filename))
+
+ # Perform the entire download, without resuming.
+ with SetBotoConfigForTest(boto_config_for_test):
+ stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath)],
+ return_stderr=True)
+ self.assertNotIn('Resuming download', stderr)
+ # Temporary download file should have been deleted.
+ self.assertFalse(os.path.isfile(fpath + '_.gstmp'))
+ with open(fpath, 'r') as f:
+ self.assertEqual(f.read(), 'abcd' * self.halt_size,
+ 'File contents differ')
+
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod),
+ 'Sliced download requires fast crcmod.')
+ @SkipForS3('No sliced download support for S3.')
+ def test_cp_sliced_download_resume(self):
+ """Tests that sliced object download is resumable."""
+ bucket_uri = self.CreateBucket()
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
+ contents='abc' * self.halt_size)
+ fpath = self.CreateTempFile()
+ test_callback_file = self.CreateTempFile(
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
+
+ boto_config_for_test = [
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_max_components', '3')]
+
+ with SetBotoConfigForTest(boto_config_for_test):
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
+ suri(object_uri), suri(fpath)],
+ return_stderr=True, expected_status=1)
+ self.assertIn('not downloaded successfully', stderr)
+
+ # Each tracker file should exist.
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths(
+ StorageUrlFromString(fpath), self.test_api)
+ for tracker_filename in tracker_filenames:
+ self.assertTrue(os.path.isfile(tracker_filename))
+
+ stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
+ return_stderr=True)
+ self.assertIn('Resuming download', stderr)
+
+ # Each tracker file should have been deleted.
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths(
+ StorageUrlFromString(fpath), self.test_api)
+ for tracker_filename in tracker_filenames:
+ self.assertFalse(os.path.isfile(tracker_filename))
+
+ with open(fpath, 'r') as f:
+ self.assertEqual(f.read(), 'abc' * self.halt_size,
+ 'File contents differ')
+
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod),
+ 'Sliced download requires fast crcmod.')
+ @SkipForS3('No sliced download support for S3.')
+ def test_cp_sliced_download_partial_resume(self):
+ """Test sliced download resumability when some components are finished."""
+ bucket_uri = self.CreateBucket()
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
+ contents='abc' * self.halt_size)
+ fpath = self.CreateTempFile()
+ test_callback_file = self.CreateTempFile(
+ contents=pickle.dumps(_HaltOneComponentCopyCallbackHandler(5)))
+
+ boto_config_for_test = [
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_max_components', '3')]
+
+ with SetBotoConfigForTest(boto_config_for_test):
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
+ suri(object_uri), suri(fpath)],
+ return_stderr=True, expected_status=1)
+ self.assertIn('not downloaded successfully', stderr)
+
+ # Each tracker file should exist.
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths(
+ StorageUrlFromString(fpath), self.test_api)
+ for tracker_filename in tracker_filenames:
+ self.assertTrue(os.path.isfile(tracker_filename))
+
+ stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
+ return_stderr=True)
+ self.assertIn('Resuming download', stderr)
+ self.assertIn('Download already complete', stderr)
+
+ # Each tracker file should have been deleted.
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths(
+ StorageUrlFromString(fpath), self.test_api)
+ for tracker_filename in tracker_filenames:
+ self.assertFalse(os.path.isfile(tracker_filename))
+
+ with open(fpath, 'r') as f:
+ self.assertEqual(f.read(), 'abc' * self.halt_size,
+ 'File contents differ')
+
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod),
+ 'Sliced download requires fast crcmod.')
+ @SkipForS3('No sliced download support for S3.')
+ def test_cp_sliced_download_resume_content_differs(self):
+ """Tests differing file contents are detected by sliced downloads."""
+ bucket_uri = self.CreateBucket()
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
+ contents='abc' * self.halt_size)
+ fpath = self.CreateTempFile(contents='')
+ test_callback_file = self.CreateTempFile(
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
+
+ boto_config_for_test = [
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_max_components', '3')]
+
+ with SetBotoConfigForTest(boto_config_for_test):
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
+ suri(object_uri), suri(fpath)],
+ return_stderr=True, expected_status=1)
+ self.assertIn('not downloaded successfully', stderr)
+
+ # Temporary download file should exist.
+ self.assertTrue(os.path.isfile(fpath + '_.gstmp'))
+
+ # Each tracker file should exist.
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths(
+ StorageUrlFromString(fpath), self.test_api)
+ for tracker_filename in tracker_filenames:
+ self.assertTrue(os.path.isfile(tracker_filename))
+
+ with open(fpath + '_.gstmp', 'r+b') as f:
+ f.write('altered file contents')
+
+ stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
+ return_stderr=True, expected_status=1)
+ self.assertIn('Resuming download', stderr)
+ self.assertIn('doesn\'t match cloud-supplied digest', stderr)
+ self.assertIn('HashMismatchException: crc32c', stderr)
+
+ # Each tracker file should have been deleted.
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths(
+ StorageUrlFromString(fpath), self.test_api)
+ for tracker_filename in tracker_filenames:
+ self.assertFalse(os.path.isfile(tracker_filename))
+
+ # Temporary file should have been deleted due to hash mismatch.
+ self.assertFalse(os.path.isfile(fpath + '_.gstmp'))
+ # Final file should not exist.
+ self.assertFalse(os.path.isfile(fpath))
+
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod),
+ 'Sliced download requires fast crcmod.')
+ @SkipForS3('No sliced download support for S3.')
+ def test_cp_sliced_download_component_size_changed(self):
+ """Tests sliced download doesn't break when the boto config changes.
+
+ If the number of components used changes cross-process, the download should
+ be restarted.
+ """
+ bucket_uri = self.CreateBucket()
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
+ contents='abcd' * self.halt_size)
+ fpath = self.CreateTempFile()
+ test_callback_file = self.CreateTempFile(
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
+
+ boto_config_for_test = [
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_component_size',
+ str(self.halt_size//4)),
+ ('GSUtil', 'sliced_object_download_max_components', '4')]
+
+ with SetBotoConfigForTest(boto_config_for_test):
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
+ suri(object_uri), suri(fpath)],
+ return_stderr=True, expected_status=1)
+ self.assertIn('not downloaded successfully', stderr)
+
+ boto_config_for_test = [
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_component_size',
+ str(self.halt_size//2)),
+ ('GSUtil', 'sliced_object_download_max_components', '2')]
+
+ with SetBotoConfigForTest(boto_config_for_test):
+ stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
+ return_stderr=True)
+ self.assertIn('Sliced download tracker file doesn\'t match ', stderr)
+ self.assertIn('Restarting download from scratch', stderr)
+ self.assertNotIn('Resuming download', stderr)
+
+ @unittest.skipUnless(UsingCrcmodExtension(crcmod),
+ 'Sliced download requires fast crcmod.')
+ @SkipForS3('No sliced download support for S3.')
+ def test_cp_sliced_download_disabled_cross_process(self):
+ """Tests temporary files are not orphaned if sliced download is disabled.
+
+ Specifically, temporary files should be deleted when the corresponding
+ non-sliced download is completed.
+ """
+ bucket_uri = self.CreateBucket()
+ object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
+ contents='abcd' * self.halt_size)
+ fpath = self.CreateTempFile()
+ test_callback_file = self.CreateTempFile(
+ contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
+
+ boto_config_for_test = [
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_max_components', '4')]
+
+ with SetBotoConfigForTest(boto_config_for_test):
+ stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
+ suri(object_uri), suri(fpath)],
+ return_stderr=True, expected_status=1)
+ self.assertIn('not downloaded successfully', stderr)
+ # Temporary download file should exist.
+ self.assertTrue(os.path.isfile(fpath + '_.gstmp'))
+
+ # Each tracker file should exist.
+ tracker_filenames = GetSlicedDownloadTrackerFilePaths(
+ StorageUrlFromString(fpath), self.test_api)
+ for tracker_filename in tracker_filenames:
+ self.assertTrue(os.path.isfile(tracker_filename))
+
+ # Disable sliced downloads by increasing the threshold
+ boto_config_for_test = [
+ ('GSUtil', 'resumable_threshold', str(self.halt_size)),
+ ('GSUtil', 'sliced_object_download_threshold', str(self.halt_size*5)),
+ ('GSUtil', 'sliced_object_download_max_components', '4')]
+
+ with SetBotoConfigForTest(boto_config_for_test):
+ stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
+ return_stderr=True)
+ self.assertNotIn('Resuming download', stderr)
+ # Temporary download file should have been deleted.
+ self.assertFalse(os.path.isfile(fpath + '_.gstmp'))
+
+ # Each tracker file should have been deleted.
+ for tracker_filename in tracker_filenames:
+ self.assertFalse(os.path.isfile(tracker_filename))
+ with open(fpath, 'r') as f:
+ self.assertEqual(f.read(), 'abcd' * self.halt_size)
+
@SkipForS3('No resumable upload support for S3.')
def test_cp_resumable_upload_start_over_http_error(self):
for start_over_error in (404, 410):

Powered by Google App Engine
This is Rietveld 408576698