Index: third_party/gsutil/gslib/tests/test_copy_helper_funcs.py |
diff --git a/third_party/gsutil/gslib/tests/test_copy_helper_funcs.py b/third_party/gsutil/gslib/tests/test_copy_helper_funcs.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..accff032c64f2b417f4b3c5c727c7744a54fe85b |
--- /dev/null |
+++ b/third_party/gsutil/gslib/tests/test_copy_helper_funcs.py |
@@ -0,0 +1,386 @@ |
+# -*- coding: utf-8 -*- |
+# Copyright 2013 Google Inc. All Rights Reserved. |
+# |
+# Licensed under the Apache License, Version 2.0 (the "License"); |
+# you may not use this file except in compliance with the License. |
+# You may obtain a copy of the License at |
+# |
+# http://www.apache.org/licenses/LICENSE-2.0 |
+# |
+# Unless required by applicable law or agreed to in writing, software |
+# distributed under the License is distributed on an "AS IS" BASIS, |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+# See the License for the specific language governing permissions and |
+# limitations under the License. |
+"""Unit tests for parallel upload functions in copy_helper.""" |
+ |
+from apitools.base.py import exceptions as apitools_exceptions |
+ |
+from util import GSMockBucketStorageUri |
+ |
+from gslib.cloud_api import ResumableUploadAbortException |
+from gslib.cloud_api import ResumableUploadException |
+from gslib.cloud_api import ResumableUploadStartOverException |
+from gslib.cloud_api import ServiceException |
+from gslib.command import CreateGsutilLogger |
+from gslib.copy_helper import _AppendComponentTrackerToParallelUploadTrackerFile |
+from gslib.copy_helper import _CreateParallelUploadTrackerFile |
+from gslib.copy_helper import _GetPartitionInfo |
+from gslib.copy_helper import _ParseParallelUploadTrackerFile |
+from gslib.copy_helper import FilterExistingComponents |
+from gslib.copy_helper import ObjectFromTracker |
+from gslib.copy_helper import PerformParallelUploadFileToObjectArgs |
+from gslib.gcs_json_api import GcsJsonApi |
+from gslib.hashing_helper import CalculateB64EncodedMd5FromContents |
+from gslib.storage_url import StorageUrlFromString |
+from gslib.tests.mock_cloud_api import MockCloudApi |
+from gslib.tests.testcase.unit_testcase import GsUtilUnitTestCase |
+from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages |
+from gslib.util import CreateLock |
+ |
+ |
+class TestCpFuncs(GsUtilUnitTestCase): |
+ """Unit tests for parallel upload functions in cp command.""" |
+ |
+ def test_GetPartitionInfo(self): |
+ """Tests the _GetPartitionInfo function.""" |
+ # Simplest case - threshold divides file_size. |
+ (num_components, component_size) = _GetPartitionInfo(300, 200, 10) |
+ self.assertEqual(30, num_components) |
+ self.assertEqual(10, component_size) |
+ |
+ # Threshold = 1 (mod file_size). |
+ (num_components, component_size) = _GetPartitionInfo(301, 200, 10) |
+ self.assertEqual(31, num_components) |
+ self.assertEqual(10, component_size) |
+ |
+ # Threshold = -1 (mod file_size). |
+ (num_components, component_size) = _GetPartitionInfo(299, 200, 10) |
+ self.assertEqual(30, num_components) |
+ self.assertEqual(10, component_size) |
+ |
+ # Too many components needed. |
+ (num_components, component_size) = _GetPartitionInfo(301, 2, 10) |
+ self.assertEqual(2, num_components) |
+ self.assertEqual(151, component_size) |
+ |
+ # Test num_components with huge numbers. |
+ (num_components, component_size) = _GetPartitionInfo((10 ** 150) + 1, |
+ 10 ** 200, |
+ 10) |
+ self.assertEqual((10 ** 149) + 1, num_components) |
+ self.assertEqual(10, component_size) |
+ |
+ # Test component_size with huge numbers. |
+ (num_components, component_size) = _GetPartitionInfo((10 ** 150) + 1, |
+ 10, |
+ 10) |
+ self.assertEqual(10, num_components) |
+ self.assertEqual((10 ** 149) + 1, component_size) |
+ |
+ # Test component_size > file_size (make sure we get at least two components. |
+ (num_components, component_size) = _GetPartitionInfo(100, 500, 51) |
+ self.assertEquals(2, num_components) |
+ self.assertEqual(50, component_size) |
+ |
+ def test_ParseParallelUploadTrackerFile(self): |
+ """Tests the _ParseParallelUploadTrackerFile function.""" |
+ tracker_file_lock = CreateLock() |
+ random_prefix = '123' |
+ objects = ['obj1', '42', 'obj2', '314159'] |
+ contents = '\n'.join([random_prefix] + objects) |
+ fpath = self.CreateTempFile(file_name='foo', contents=contents) |
+ expected_objects = [ObjectFromTracker(objects[2 * i], objects[2 * i + 1]) |
+ for i in range(0, len(objects) / 2)] |
+ (actual_prefix, actual_objects) = _ParseParallelUploadTrackerFile( |
+ fpath, tracker_file_lock) |
+ self.assertEqual(random_prefix, actual_prefix) |
+ self.assertEqual(expected_objects, actual_objects) |
+ |
+ def test_ParseEmptyParallelUploadTrackerFile(self): |
+ """Tests _ParseParallelUploadTrackerFile with an empty tracker file.""" |
+ tracker_file_lock = CreateLock() |
+ fpath = self.CreateTempFile(file_name='foo', contents='') |
+ expected_objects = [] |
+ (actual_prefix, actual_objects) = _ParseParallelUploadTrackerFile( |
+ fpath, tracker_file_lock) |
+ self.assertEqual(actual_objects, expected_objects) |
+ self.assertIsNotNone(actual_prefix) |
+ |
+ def test_CreateParallelUploadTrackerFile(self): |
+ """Tests the _CreateParallelUploadTrackerFile function.""" |
+ tracker_file = self.CreateTempFile(file_name='foo', contents='asdf') |
+ tracker_file_lock = CreateLock() |
+ random_prefix = '123' |
+ objects = ['obj1', '42', 'obj2', '314159'] |
+ expected_contents = [random_prefix] + objects |
+ objects = [ObjectFromTracker(objects[2 * i], objects[2 * i + 1]) |
+ for i in range(0, len(objects) / 2)] |
+ _CreateParallelUploadTrackerFile(tracker_file, random_prefix, objects, |
+ tracker_file_lock) |
+ with open(tracker_file, 'rb') as f: |
+ lines = f.read().splitlines() |
+ self.assertEqual(expected_contents, lines) |
+ |
+ def test_AppendComponentTrackerToParallelUploadTrackerFile(self): |
+ """Tests the _CreateParallelUploadTrackerFile function with append.""" |
+ tracker_file = self.CreateTempFile(file_name='foo', contents='asdf') |
+ tracker_file_lock = CreateLock() |
+ random_prefix = '123' |
+ objects = ['obj1', '42', 'obj2', '314159'] |
+ expected_contents = [random_prefix] + objects |
+ objects = [ObjectFromTracker(objects[2 * i], objects[2 * i + 1]) |
+ for i in range(0, len(objects) / 2)] |
+ _CreateParallelUploadTrackerFile(tracker_file, random_prefix, objects, |
+ tracker_file_lock) |
+ |
+ new_object = ['obj2', '1234'] |
+ expected_contents += new_object |
+ new_object = ObjectFromTracker(new_object[0], new_object[1]) |
+ _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, new_object, |
+ tracker_file_lock) |
+ with open(tracker_file, 'rb') as f: |
+ lines = f.read().splitlines() |
+ self.assertEqual(expected_contents, lines) |
+ |
+ def test_FilterExistingComponentsNonVersioned(self): |
+ """Tests upload with a variety of component states.""" |
+ mock_api = MockCloudApi() |
+ bucket_name = self.MakeTempName('bucket') |
+ tracker_file = self.CreateTempFile(file_name='foo', contents='asdf') |
+ tracker_file_lock = CreateLock() |
+ |
+ # dst_obj_metadata used for passing content-type. |
+ empty_object = apitools_messages.Object() |
+ |
+ # Already uploaded, contents still match, component still used. |
+ fpath_uploaded_correctly = self.CreateTempFile(file_name='foo1', |
+ contents='1') |
+ fpath_uploaded_correctly_url = StorageUrlFromString( |
+ str(fpath_uploaded_correctly)) |
+ object_uploaded_correctly_url = StorageUrlFromString('%s://%s/%s' % ( |
+ self.default_provider, bucket_name, |
+ fpath_uploaded_correctly)) |
+ with open(fpath_uploaded_correctly) as f_in: |
+ fpath_uploaded_correctly_md5 = CalculateB64EncodedMd5FromContents(f_in) |
+ mock_api.MockCreateObjectWithMetadata( |
+ apitools_messages.Object(bucket=bucket_name, |
+ name=fpath_uploaded_correctly, |
+ md5Hash=fpath_uploaded_correctly_md5), |
+ contents='1') |
+ |
+ args_uploaded_correctly = PerformParallelUploadFileToObjectArgs( |
+ fpath_uploaded_correctly, 0, 1, fpath_uploaded_correctly_url, |
+ object_uploaded_correctly_url, '', empty_object, tracker_file, |
+ tracker_file_lock) |
+ |
+ # Not yet uploaded, but needed. |
+ fpath_not_uploaded = self.CreateTempFile(file_name='foo2', contents='2') |
+ fpath_not_uploaded_url = StorageUrlFromString(str(fpath_not_uploaded)) |
+ object_not_uploaded_url = StorageUrlFromString('%s://%s/%s' % ( |
+ self.default_provider, bucket_name, fpath_not_uploaded)) |
+ args_not_uploaded = PerformParallelUploadFileToObjectArgs( |
+ fpath_not_uploaded, 0, 1, fpath_not_uploaded_url, |
+ object_not_uploaded_url, '', empty_object, tracker_file, |
+ tracker_file_lock) |
+ |
+ # Already uploaded, but contents no longer match. Even though the contents |
+ # differ, we don't delete this since the bucket is not versioned and it |
+ # will be overwritten anyway. |
+ fpath_wrong_contents = self.CreateTempFile(file_name='foo4', contents='4') |
+ fpath_wrong_contents_url = StorageUrlFromString(str(fpath_wrong_contents)) |
+ object_wrong_contents_url = StorageUrlFromString('%s://%s/%s' % ( |
+ self.default_provider, bucket_name, fpath_wrong_contents)) |
+ with open(self.CreateTempFile(contents='_')) as f_in: |
+ fpath_wrong_contents_md5 = CalculateB64EncodedMd5FromContents(f_in) |
+ mock_api.MockCreateObjectWithMetadata( |
+ apitools_messages.Object(bucket=bucket_name, |
+ name=fpath_wrong_contents, |
+ md5Hash=fpath_wrong_contents_md5), |
+ contents='1') |
+ |
+ args_wrong_contents = PerformParallelUploadFileToObjectArgs( |
+ fpath_wrong_contents, 0, 1, fpath_wrong_contents_url, |
+ object_wrong_contents_url, '', empty_object, tracker_file, |
+ tracker_file_lock) |
+ |
+ # Exists in tracker file, but component object no longer exists. |
+ fpath_remote_deleted = self.CreateTempFile(file_name='foo5', contents='5') |
+ fpath_remote_deleted_url = StorageUrlFromString( |
+ str(fpath_remote_deleted)) |
+ args_remote_deleted = PerformParallelUploadFileToObjectArgs( |
+ fpath_remote_deleted, 0, 1, fpath_remote_deleted_url, '', '', |
+ empty_object, tracker_file, tracker_file_lock) |
+ |
+ # Exists in tracker file and already uploaded, but no longer needed. |
+ fpath_no_longer_used = self.CreateTempFile(file_name='foo6', contents='6') |
+ with open(fpath_no_longer_used) as f_in: |
+ file_md5 = CalculateB64EncodedMd5FromContents(f_in) |
+ mock_api.MockCreateObjectWithMetadata( |
+ apitools_messages.Object(bucket=bucket_name, |
+ name='foo6', md5Hash=file_md5), contents='6') |
+ |
+ dst_args = {fpath_uploaded_correctly: args_uploaded_correctly, |
+ fpath_not_uploaded: args_not_uploaded, |
+ fpath_wrong_contents: args_wrong_contents, |
+ fpath_remote_deleted: args_remote_deleted} |
+ |
+ existing_components = [ObjectFromTracker(fpath_uploaded_correctly, ''), |
+ ObjectFromTracker(fpath_wrong_contents, ''), |
+ ObjectFromTracker(fpath_remote_deleted, ''), |
+ ObjectFromTracker(fpath_no_longer_used, '')] |
+ |
+ bucket_url = StorageUrlFromString('%s://%s' % (self.default_provider, |
+ bucket_name)) |
+ |
+ (components_to_upload, uploaded_components, existing_objects_to_delete) = ( |
+ FilterExistingComponents(dst_args, existing_components, |
+ bucket_url, mock_api)) |
+ |
+ for arg in [args_not_uploaded, args_wrong_contents, args_remote_deleted]: |
+ self.assertTrue(arg in components_to_upload) |
+ self.assertEqual(1, len(uploaded_components)) |
+ self.assertEqual(args_uploaded_correctly.dst_url.url_string, |
+ uploaded_components[0].url_string) |
+ self.assertEqual(1, len(existing_objects_to_delete)) |
+ no_longer_used_url = StorageUrlFromString('%s://%s/%s' % ( |
+ self.default_provider, bucket_name, fpath_no_longer_used)) |
+ self.assertEqual(no_longer_used_url.url_string, |
+ existing_objects_to_delete[0].url_string) |
+ |
+ def test_FilterExistingComponentsVersioned(self): |
+ """Tests upload with versionined parallel components.""" |
+ |
+ mock_api = MockCloudApi() |
+ bucket_name = self.MakeTempName('bucket') |
+ mock_api.MockCreateVersionedBucket(bucket_name) |
+ |
+ # dst_obj_metadata used for passing content-type. |
+ empty_object = apitools_messages.Object() |
+ |
+ tracker_file = self.CreateTempFile(file_name='foo', contents='asdf') |
+ tracker_file_lock = CreateLock() |
+ |
+ # Already uploaded, contents still match, component still used. |
+ fpath_uploaded_correctly = self.CreateTempFile(file_name='foo1', |
+ contents='1') |
+ fpath_uploaded_correctly_url = StorageUrlFromString( |
+ str(fpath_uploaded_correctly)) |
+ with open(fpath_uploaded_correctly) as f_in: |
+ fpath_uploaded_correctly_md5 = CalculateB64EncodedMd5FromContents(f_in) |
+ object_uploaded_correctly = mock_api.MockCreateObjectWithMetadata( |
+ apitools_messages.Object(bucket=bucket_name, |
+ name=fpath_uploaded_correctly, |
+ md5Hash=fpath_uploaded_correctly_md5), |
+ contents='1') |
+ object_uploaded_correctly_url = StorageUrlFromString('%s://%s/%s#%s' % ( |
+ self.default_provider, bucket_name, |
+ fpath_uploaded_correctly, object_uploaded_correctly.generation)) |
+ args_uploaded_correctly = PerformParallelUploadFileToObjectArgs( |
+ fpath_uploaded_correctly, 0, 1, fpath_uploaded_correctly_url, |
+ object_uploaded_correctly_url, object_uploaded_correctly.generation, |
+ empty_object, tracker_file, tracker_file_lock) |
+ |
+ # Duplicate object name in tracker file, but uploaded correctly. |
+ fpath_duplicate = fpath_uploaded_correctly |
+ fpath_duplicate_url = StorageUrlFromString(str(fpath_duplicate)) |
+ duplicate_uploaded_correctly = mock_api.MockCreateObjectWithMetadata( |
+ apitools_messages.Object(bucket=bucket_name, |
+ name=fpath_duplicate, |
+ md5Hash=fpath_uploaded_correctly_md5), |
+ contents='1') |
+ duplicate_uploaded_correctly_url = StorageUrlFromString('%s://%s/%s#%s' % ( |
+ self.default_provider, bucket_name, |
+ fpath_uploaded_correctly, duplicate_uploaded_correctly.generation)) |
+ args_duplicate = PerformParallelUploadFileToObjectArgs( |
+ fpath_duplicate, 0, 1, fpath_duplicate_url, |
+ duplicate_uploaded_correctly_url, |
+ duplicate_uploaded_correctly.generation, empty_object, tracker_file, |
+ tracker_file_lock) |
+ |
+ # Already uploaded, but contents no longer match. |
+ fpath_wrong_contents = self.CreateTempFile(file_name='foo4', contents='4') |
+ fpath_wrong_contents_url = StorageUrlFromString(str(fpath_wrong_contents)) |
+ with open(self.CreateTempFile(contents='_')) as f_in: |
+ fpath_wrong_contents_md5 = CalculateB64EncodedMd5FromContents(f_in) |
+ object_wrong_contents = mock_api.MockCreateObjectWithMetadata( |
+ apitools_messages.Object(bucket=bucket_name, |
+ name=fpath_wrong_contents, |
+ md5Hash=fpath_wrong_contents_md5), |
+ contents='_') |
+ wrong_contents_url = StorageUrlFromString('%s://%s/%s#%s' % ( |
+ self.default_provider, bucket_name, |
+ fpath_wrong_contents, object_wrong_contents.generation)) |
+ args_wrong_contents = PerformParallelUploadFileToObjectArgs( |
+ fpath_wrong_contents, 0, 1, fpath_wrong_contents_url, |
+ wrong_contents_url, '', empty_object, tracker_file, |
+ tracker_file_lock) |
+ |
+ dst_args = {fpath_uploaded_correctly: args_uploaded_correctly, |
+ fpath_wrong_contents: args_wrong_contents} |
+ |
+ existing_components = [ |
+ ObjectFromTracker(fpath_uploaded_correctly, |
+ object_uploaded_correctly_url.generation), |
+ ObjectFromTracker(fpath_duplicate, |
+ duplicate_uploaded_correctly_url.generation), |
+ ObjectFromTracker(fpath_wrong_contents, |
+ wrong_contents_url.generation)] |
+ |
+ bucket_url = StorageUrlFromString('%s://%s' % (self.default_provider, |
+ bucket_name)) |
+ |
+ (components_to_upload, uploaded_components, existing_objects_to_delete) = ( |
+ FilterExistingComponents(dst_args, existing_components, |
+ bucket_url, mock_api)) |
+ |
+ self.assertEqual([args_wrong_contents], components_to_upload) |
+ self.assertEqual(args_uploaded_correctly.dst_url.url_string, |
+ uploaded_components[0].url_string) |
+ expected_to_delete = [(args_wrong_contents.dst_url.object_name, |
+ args_wrong_contents.dst_url.generation), |
+ (args_duplicate.dst_url.object_name, |
+ args_duplicate.dst_url.generation)] |
+ for uri in existing_objects_to_delete: |
+ self.assertTrue((uri.object_name, uri.generation) in expected_to_delete) |
+ self.assertEqual(len(expected_to_delete), len(existing_objects_to_delete)) |
+ |
+ # pylint: disable=protected-access |
+ def test_TranslateApitoolsResumableUploadException(self): |
+ """Tests that _TranslateApitoolsResumableUploadException works correctly.""" |
+ gsutil_api = GcsJsonApi( |
+ GSMockBucketStorageUri, |
+ CreateGsutilLogger('copy_test')) |
+ |
+ gsutil_api.http.disable_ssl_certificate_validation = True |
+ exc = apitools_exceptions.HttpError({'status': 503}, None, None) |
+ translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
+ self.assertTrue(isinstance(translated_exc, ServiceException)) |
+ |
+ gsutil_api.http.disable_ssl_certificate_validation = False |
+ exc = apitools_exceptions.HttpError({'status': 503}, None, None) |
+ translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
+ self.assertTrue(isinstance(translated_exc, ResumableUploadException)) |
+ |
+ gsutil_api.http.disable_ssl_certificate_validation = False |
+ exc = apitools_exceptions.HttpError({'status': 429}, None, None) |
+ translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
+ self.assertTrue(isinstance(translated_exc, ResumableUploadException)) |
+ |
+ exc = apitools_exceptions.HttpError({'status': 410}, None, None) |
+ translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
+ self.assertTrue(isinstance(translated_exc, |
+ ResumableUploadStartOverException)) |
+ |
+ exc = apitools_exceptions.HttpError({'status': 404}, None, None) |
+ translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
+ self.assertTrue(isinstance(translated_exc, |
+ ResumableUploadStartOverException)) |
+ |
+ exc = apitools_exceptions.HttpError({'status': 401}, None, None) |
+ translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
+ self.assertTrue(isinstance(translated_exc, ResumableUploadAbortException)) |
+ |
+ exc = apitools_exceptions.TransferError('Aborting transfer') |
+ translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
+ self.assertTrue(isinstance(translated_exc, ResumableUploadAbortException)) |