OLD | NEW |
(Empty) | |
| 1 # -*- coding: utf-8 -*- |
| 2 # Copyright 2013 Google Inc. All Rights Reserved. |
| 3 # |
| 4 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 # you may not use this file except in compliance with the License. |
| 6 # You may obtain a copy of the License at |
| 7 # |
| 8 # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 # |
| 10 # Unless required by applicable law or agreed to in writing, software |
| 11 # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 # See the License for the specific language governing permissions and |
| 14 # limitations under the License. |
| 15 """Unit tests for parallel upload functions in copy_helper.""" |
| 16 |
| 17 from apitools.base.py import exceptions as apitools_exceptions |
| 18 |
| 19 from util import GSMockBucketStorageUri |
| 20 |
| 21 from gslib.cloud_api import ResumableUploadAbortException |
| 22 from gslib.cloud_api import ResumableUploadException |
| 23 from gslib.cloud_api import ResumableUploadStartOverException |
| 24 from gslib.cloud_api import ServiceException |
| 25 from gslib.command import CreateGsutilLogger |
| 26 from gslib.copy_helper import _AppendComponentTrackerToParallelUploadTrackerFile |
| 27 from gslib.copy_helper import _CreateParallelUploadTrackerFile |
| 28 from gslib.copy_helper import _GetPartitionInfo |
| 29 from gslib.copy_helper import _ParseParallelUploadTrackerFile |
| 30 from gslib.copy_helper import FilterExistingComponents |
| 31 from gslib.copy_helper import ObjectFromTracker |
| 32 from gslib.copy_helper import PerformParallelUploadFileToObjectArgs |
| 33 from gslib.gcs_json_api import GcsJsonApi |
| 34 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents |
| 35 from gslib.storage_url import StorageUrlFromString |
| 36 from gslib.tests.mock_cloud_api import MockCloudApi |
| 37 from gslib.tests.testcase.unit_testcase import GsUtilUnitTestCase |
| 38 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages |
| 39 from gslib.util import CreateLock |
| 40 |
| 41 |
| 42 class TestCpFuncs(GsUtilUnitTestCase): |
| 43 """Unit tests for parallel upload functions in cp command.""" |
| 44 |
| 45 def test_GetPartitionInfo(self): |
| 46 """Tests the _GetPartitionInfo function.""" |
| 47 # Simplest case - threshold divides file_size. |
| 48 (num_components, component_size) = _GetPartitionInfo(300, 200, 10) |
| 49 self.assertEqual(30, num_components) |
| 50 self.assertEqual(10, component_size) |
| 51 |
| 52 # Threshold = 1 (mod file_size). |
| 53 (num_components, component_size) = _GetPartitionInfo(301, 200, 10) |
| 54 self.assertEqual(31, num_components) |
| 55 self.assertEqual(10, component_size) |
| 56 |
| 57 # Threshold = -1 (mod file_size). |
| 58 (num_components, component_size) = _GetPartitionInfo(299, 200, 10) |
| 59 self.assertEqual(30, num_components) |
| 60 self.assertEqual(10, component_size) |
| 61 |
| 62 # Too many components needed. |
| 63 (num_components, component_size) = _GetPartitionInfo(301, 2, 10) |
| 64 self.assertEqual(2, num_components) |
| 65 self.assertEqual(151, component_size) |
| 66 |
| 67 # Test num_components with huge numbers. |
| 68 (num_components, component_size) = _GetPartitionInfo((10 ** 150) + 1, |
| 69 10 ** 200, |
| 70 10) |
| 71 self.assertEqual((10 ** 149) + 1, num_components) |
| 72 self.assertEqual(10, component_size) |
| 73 |
| 74 # Test component_size with huge numbers. |
| 75 (num_components, component_size) = _GetPartitionInfo((10 ** 150) + 1, |
| 76 10, |
| 77 10) |
| 78 self.assertEqual(10, num_components) |
| 79 self.assertEqual((10 ** 149) + 1, component_size) |
| 80 |
| 81 # Test component_size > file_size (make sure we get at least two components. |
| 82 (num_components, component_size) = _GetPartitionInfo(100, 500, 51) |
| 83 self.assertEquals(2, num_components) |
| 84 self.assertEqual(50, component_size) |
| 85 |
| 86 def test_ParseParallelUploadTrackerFile(self): |
| 87 """Tests the _ParseParallelUploadTrackerFile function.""" |
| 88 tracker_file_lock = CreateLock() |
| 89 random_prefix = '123' |
| 90 objects = ['obj1', '42', 'obj2', '314159'] |
| 91 contents = '\n'.join([random_prefix] + objects) |
| 92 fpath = self.CreateTempFile(file_name='foo', contents=contents) |
| 93 expected_objects = [ObjectFromTracker(objects[2 * i], objects[2 * i + 1]) |
| 94 for i in range(0, len(objects) / 2)] |
| 95 (actual_prefix, actual_objects) = _ParseParallelUploadTrackerFile( |
| 96 fpath, tracker_file_lock) |
| 97 self.assertEqual(random_prefix, actual_prefix) |
| 98 self.assertEqual(expected_objects, actual_objects) |
| 99 |
| 100 def test_ParseEmptyParallelUploadTrackerFile(self): |
| 101 """Tests _ParseParallelUploadTrackerFile with an empty tracker file.""" |
| 102 tracker_file_lock = CreateLock() |
| 103 fpath = self.CreateTempFile(file_name='foo', contents='') |
| 104 expected_objects = [] |
| 105 (actual_prefix, actual_objects) = _ParseParallelUploadTrackerFile( |
| 106 fpath, tracker_file_lock) |
| 107 self.assertEqual(actual_objects, expected_objects) |
| 108 self.assertIsNotNone(actual_prefix) |
| 109 |
| 110 def test_CreateParallelUploadTrackerFile(self): |
| 111 """Tests the _CreateParallelUploadTrackerFile function.""" |
| 112 tracker_file = self.CreateTempFile(file_name='foo', contents='asdf') |
| 113 tracker_file_lock = CreateLock() |
| 114 random_prefix = '123' |
| 115 objects = ['obj1', '42', 'obj2', '314159'] |
| 116 expected_contents = [random_prefix] + objects |
| 117 objects = [ObjectFromTracker(objects[2 * i], objects[2 * i + 1]) |
| 118 for i in range(0, len(objects) / 2)] |
| 119 _CreateParallelUploadTrackerFile(tracker_file, random_prefix, objects, |
| 120 tracker_file_lock) |
| 121 with open(tracker_file, 'rb') as f: |
| 122 lines = f.read().splitlines() |
| 123 self.assertEqual(expected_contents, lines) |
| 124 |
| 125 def test_AppendComponentTrackerToParallelUploadTrackerFile(self): |
| 126 """Tests the _CreateParallelUploadTrackerFile function with append.""" |
| 127 tracker_file = self.CreateTempFile(file_name='foo', contents='asdf') |
| 128 tracker_file_lock = CreateLock() |
| 129 random_prefix = '123' |
| 130 objects = ['obj1', '42', 'obj2', '314159'] |
| 131 expected_contents = [random_prefix] + objects |
| 132 objects = [ObjectFromTracker(objects[2 * i], objects[2 * i + 1]) |
| 133 for i in range(0, len(objects) / 2)] |
| 134 _CreateParallelUploadTrackerFile(tracker_file, random_prefix, objects, |
| 135 tracker_file_lock) |
| 136 |
| 137 new_object = ['obj2', '1234'] |
| 138 expected_contents += new_object |
| 139 new_object = ObjectFromTracker(new_object[0], new_object[1]) |
| 140 _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, new_object, |
| 141 tracker_file_lock) |
| 142 with open(tracker_file, 'rb') as f: |
| 143 lines = f.read().splitlines() |
| 144 self.assertEqual(expected_contents, lines) |
| 145 |
| 146 def test_FilterExistingComponentsNonVersioned(self): |
| 147 """Tests upload with a variety of component states.""" |
| 148 mock_api = MockCloudApi() |
| 149 bucket_name = self.MakeTempName('bucket') |
| 150 tracker_file = self.CreateTempFile(file_name='foo', contents='asdf') |
| 151 tracker_file_lock = CreateLock() |
| 152 |
| 153 # dst_obj_metadata used for passing content-type. |
| 154 empty_object = apitools_messages.Object() |
| 155 |
| 156 # Already uploaded, contents still match, component still used. |
| 157 fpath_uploaded_correctly = self.CreateTempFile(file_name='foo1', |
| 158 contents='1') |
| 159 fpath_uploaded_correctly_url = StorageUrlFromString( |
| 160 str(fpath_uploaded_correctly)) |
| 161 object_uploaded_correctly_url = StorageUrlFromString('%s://%s/%s' % ( |
| 162 self.default_provider, bucket_name, |
| 163 fpath_uploaded_correctly)) |
| 164 with open(fpath_uploaded_correctly) as f_in: |
| 165 fpath_uploaded_correctly_md5 = CalculateB64EncodedMd5FromContents(f_in) |
| 166 mock_api.MockCreateObjectWithMetadata( |
| 167 apitools_messages.Object(bucket=bucket_name, |
| 168 name=fpath_uploaded_correctly, |
| 169 md5Hash=fpath_uploaded_correctly_md5), |
| 170 contents='1') |
| 171 |
| 172 args_uploaded_correctly = PerformParallelUploadFileToObjectArgs( |
| 173 fpath_uploaded_correctly, 0, 1, fpath_uploaded_correctly_url, |
| 174 object_uploaded_correctly_url, '', empty_object, tracker_file, |
| 175 tracker_file_lock) |
| 176 |
| 177 # Not yet uploaded, but needed. |
| 178 fpath_not_uploaded = self.CreateTempFile(file_name='foo2', contents='2') |
| 179 fpath_not_uploaded_url = StorageUrlFromString(str(fpath_not_uploaded)) |
| 180 object_not_uploaded_url = StorageUrlFromString('%s://%s/%s' % ( |
| 181 self.default_provider, bucket_name, fpath_not_uploaded)) |
| 182 args_not_uploaded = PerformParallelUploadFileToObjectArgs( |
| 183 fpath_not_uploaded, 0, 1, fpath_not_uploaded_url, |
| 184 object_not_uploaded_url, '', empty_object, tracker_file, |
| 185 tracker_file_lock) |
| 186 |
| 187 # Already uploaded, but contents no longer match. Even though the contents |
| 188 # differ, we don't delete this since the bucket is not versioned and it |
| 189 # will be overwritten anyway. |
| 190 fpath_wrong_contents = self.CreateTempFile(file_name='foo4', contents='4') |
| 191 fpath_wrong_contents_url = StorageUrlFromString(str(fpath_wrong_contents)) |
| 192 object_wrong_contents_url = StorageUrlFromString('%s://%s/%s' % ( |
| 193 self.default_provider, bucket_name, fpath_wrong_contents)) |
| 194 with open(self.CreateTempFile(contents='_')) as f_in: |
| 195 fpath_wrong_contents_md5 = CalculateB64EncodedMd5FromContents(f_in) |
| 196 mock_api.MockCreateObjectWithMetadata( |
| 197 apitools_messages.Object(bucket=bucket_name, |
| 198 name=fpath_wrong_contents, |
| 199 md5Hash=fpath_wrong_contents_md5), |
| 200 contents='1') |
| 201 |
| 202 args_wrong_contents = PerformParallelUploadFileToObjectArgs( |
| 203 fpath_wrong_contents, 0, 1, fpath_wrong_contents_url, |
| 204 object_wrong_contents_url, '', empty_object, tracker_file, |
| 205 tracker_file_lock) |
| 206 |
| 207 # Exists in tracker file, but component object no longer exists. |
| 208 fpath_remote_deleted = self.CreateTempFile(file_name='foo5', contents='5') |
| 209 fpath_remote_deleted_url = StorageUrlFromString( |
| 210 str(fpath_remote_deleted)) |
| 211 args_remote_deleted = PerformParallelUploadFileToObjectArgs( |
| 212 fpath_remote_deleted, 0, 1, fpath_remote_deleted_url, '', '', |
| 213 empty_object, tracker_file, tracker_file_lock) |
| 214 |
| 215 # Exists in tracker file and already uploaded, but no longer needed. |
| 216 fpath_no_longer_used = self.CreateTempFile(file_name='foo6', contents='6') |
| 217 with open(fpath_no_longer_used) as f_in: |
| 218 file_md5 = CalculateB64EncodedMd5FromContents(f_in) |
| 219 mock_api.MockCreateObjectWithMetadata( |
| 220 apitools_messages.Object(bucket=bucket_name, |
| 221 name='foo6', md5Hash=file_md5), contents='6') |
| 222 |
| 223 dst_args = {fpath_uploaded_correctly: args_uploaded_correctly, |
| 224 fpath_not_uploaded: args_not_uploaded, |
| 225 fpath_wrong_contents: args_wrong_contents, |
| 226 fpath_remote_deleted: args_remote_deleted} |
| 227 |
| 228 existing_components = [ObjectFromTracker(fpath_uploaded_correctly, ''), |
| 229 ObjectFromTracker(fpath_wrong_contents, ''), |
| 230 ObjectFromTracker(fpath_remote_deleted, ''), |
| 231 ObjectFromTracker(fpath_no_longer_used, '')] |
| 232 |
| 233 bucket_url = StorageUrlFromString('%s://%s' % (self.default_provider, |
| 234 bucket_name)) |
| 235 |
| 236 (components_to_upload, uploaded_components, existing_objects_to_delete) = ( |
| 237 FilterExistingComponents(dst_args, existing_components, |
| 238 bucket_url, mock_api)) |
| 239 |
| 240 for arg in [args_not_uploaded, args_wrong_contents, args_remote_deleted]: |
| 241 self.assertTrue(arg in components_to_upload) |
| 242 self.assertEqual(1, len(uploaded_components)) |
| 243 self.assertEqual(args_uploaded_correctly.dst_url.url_string, |
| 244 uploaded_components[0].url_string) |
| 245 self.assertEqual(1, len(existing_objects_to_delete)) |
| 246 no_longer_used_url = StorageUrlFromString('%s://%s/%s' % ( |
| 247 self.default_provider, bucket_name, fpath_no_longer_used)) |
| 248 self.assertEqual(no_longer_used_url.url_string, |
| 249 existing_objects_to_delete[0].url_string) |
| 250 |
| 251 def test_FilterExistingComponentsVersioned(self): |
| 252 """Tests upload with versionined parallel components.""" |
| 253 |
| 254 mock_api = MockCloudApi() |
| 255 bucket_name = self.MakeTempName('bucket') |
| 256 mock_api.MockCreateVersionedBucket(bucket_name) |
| 257 |
| 258 # dst_obj_metadata used for passing content-type. |
| 259 empty_object = apitools_messages.Object() |
| 260 |
| 261 tracker_file = self.CreateTempFile(file_name='foo', contents='asdf') |
| 262 tracker_file_lock = CreateLock() |
| 263 |
| 264 # Already uploaded, contents still match, component still used. |
| 265 fpath_uploaded_correctly = self.CreateTempFile(file_name='foo1', |
| 266 contents='1') |
| 267 fpath_uploaded_correctly_url = StorageUrlFromString( |
| 268 str(fpath_uploaded_correctly)) |
| 269 with open(fpath_uploaded_correctly) as f_in: |
| 270 fpath_uploaded_correctly_md5 = CalculateB64EncodedMd5FromContents(f_in) |
| 271 object_uploaded_correctly = mock_api.MockCreateObjectWithMetadata( |
| 272 apitools_messages.Object(bucket=bucket_name, |
| 273 name=fpath_uploaded_correctly, |
| 274 md5Hash=fpath_uploaded_correctly_md5), |
| 275 contents='1') |
| 276 object_uploaded_correctly_url = StorageUrlFromString('%s://%s/%s#%s' % ( |
| 277 self.default_provider, bucket_name, |
| 278 fpath_uploaded_correctly, object_uploaded_correctly.generation)) |
| 279 args_uploaded_correctly = PerformParallelUploadFileToObjectArgs( |
| 280 fpath_uploaded_correctly, 0, 1, fpath_uploaded_correctly_url, |
| 281 object_uploaded_correctly_url, object_uploaded_correctly.generation, |
| 282 empty_object, tracker_file, tracker_file_lock) |
| 283 |
| 284 # Duplicate object name in tracker file, but uploaded correctly. |
| 285 fpath_duplicate = fpath_uploaded_correctly |
| 286 fpath_duplicate_url = StorageUrlFromString(str(fpath_duplicate)) |
| 287 duplicate_uploaded_correctly = mock_api.MockCreateObjectWithMetadata( |
| 288 apitools_messages.Object(bucket=bucket_name, |
| 289 name=fpath_duplicate, |
| 290 md5Hash=fpath_uploaded_correctly_md5), |
| 291 contents='1') |
| 292 duplicate_uploaded_correctly_url = StorageUrlFromString('%s://%s/%s#%s' % ( |
| 293 self.default_provider, bucket_name, |
| 294 fpath_uploaded_correctly, duplicate_uploaded_correctly.generation)) |
| 295 args_duplicate = PerformParallelUploadFileToObjectArgs( |
| 296 fpath_duplicate, 0, 1, fpath_duplicate_url, |
| 297 duplicate_uploaded_correctly_url, |
| 298 duplicate_uploaded_correctly.generation, empty_object, tracker_file, |
| 299 tracker_file_lock) |
| 300 |
| 301 # Already uploaded, but contents no longer match. |
| 302 fpath_wrong_contents = self.CreateTempFile(file_name='foo4', contents='4') |
| 303 fpath_wrong_contents_url = StorageUrlFromString(str(fpath_wrong_contents)) |
| 304 with open(self.CreateTempFile(contents='_')) as f_in: |
| 305 fpath_wrong_contents_md5 = CalculateB64EncodedMd5FromContents(f_in) |
| 306 object_wrong_contents = mock_api.MockCreateObjectWithMetadata( |
| 307 apitools_messages.Object(bucket=bucket_name, |
| 308 name=fpath_wrong_contents, |
| 309 md5Hash=fpath_wrong_contents_md5), |
| 310 contents='_') |
| 311 wrong_contents_url = StorageUrlFromString('%s://%s/%s#%s' % ( |
| 312 self.default_provider, bucket_name, |
| 313 fpath_wrong_contents, object_wrong_contents.generation)) |
| 314 args_wrong_contents = PerformParallelUploadFileToObjectArgs( |
| 315 fpath_wrong_contents, 0, 1, fpath_wrong_contents_url, |
| 316 wrong_contents_url, '', empty_object, tracker_file, |
| 317 tracker_file_lock) |
| 318 |
| 319 dst_args = {fpath_uploaded_correctly: args_uploaded_correctly, |
| 320 fpath_wrong_contents: args_wrong_contents} |
| 321 |
| 322 existing_components = [ |
| 323 ObjectFromTracker(fpath_uploaded_correctly, |
| 324 object_uploaded_correctly_url.generation), |
| 325 ObjectFromTracker(fpath_duplicate, |
| 326 duplicate_uploaded_correctly_url.generation), |
| 327 ObjectFromTracker(fpath_wrong_contents, |
| 328 wrong_contents_url.generation)] |
| 329 |
| 330 bucket_url = StorageUrlFromString('%s://%s' % (self.default_provider, |
| 331 bucket_name)) |
| 332 |
| 333 (components_to_upload, uploaded_components, existing_objects_to_delete) = ( |
| 334 FilterExistingComponents(dst_args, existing_components, |
| 335 bucket_url, mock_api)) |
| 336 |
| 337 self.assertEqual([args_wrong_contents], components_to_upload) |
| 338 self.assertEqual(args_uploaded_correctly.dst_url.url_string, |
| 339 uploaded_components[0].url_string) |
| 340 expected_to_delete = [(args_wrong_contents.dst_url.object_name, |
| 341 args_wrong_contents.dst_url.generation), |
| 342 (args_duplicate.dst_url.object_name, |
| 343 args_duplicate.dst_url.generation)] |
| 344 for uri in existing_objects_to_delete: |
| 345 self.assertTrue((uri.object_name, uri.generation) in expected_to_delete) |
| 346 self.assertEqual(len(expected_to_delete), len(existing_objects_to_delete)) |
| 347 |
| 348 # pylint: disable=protected-access |
| 349 def test_TranslateApitoolsResumableUploadException(self): |
| 350 """Tests that _TranslateApitoolsResumableUploadException works correctly.""" |
| 351 gsutil_api = GcsJsonApi( |
| 352 GSMockBucketStorageUri, |
| 353 CreateGsutilLogger('copy_test')) |
| 354 |
| 355 gsutil_api.http.disable_ssl_certificate_validation = True |
| 356 exc = apitools_exceptions.HttpError({'status': 503}, None, None) |
| 357 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
| 358 self.assertTrue(isinstance(translated_exc, ServiceException)) |
| 359 |
| 360 gsutil_api.http.disable_ssl_certificate_validation = False |
| 361 exc = apitools_exceptions.HttpError({'status': 503}, None, None) |
| 362 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
| 363 self.assertTrue(isinstance(translated_exc, ResumableUploadException)) |
| 364 |
| 365 gsutil_api.http.disable_ssl_certificate_validation = False |
| 366 exc = apitools_exceptions.HttpError({'status': 429}, None, None) |
| 367 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
| 368 self.assertTrue(isinstance(translated_exc, ResumableUploadException)) |
| 369 |
| 370 exc = apitools_exceptions.HttpError({'status': 410}, None, None) |
| 371 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
| 372 self.assertTrue(isinstance(translated_exc, |
| 373 ResumableUploadStartOverException)) |
| 374 |
| 375 exc = apitools_exceptions.HttpError({'status': 404}, None, None) |
| 376 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
| 377 self.assertTrue(isinstance(translated_exc, |
| 378 ResumableUploadStartOverException)) |
| 379 |
| 380 exc = apitools_exceptions.HttpError({'status': 401}, None, None) |
| 381 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
| 382 self.assertTrue(isinstance(translated_exc, ResumableUploadAbortException)) |
| 383 |
| 384 exc = apitools_exceptions.TransferError('Aborting transfer') |
| 385 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) |
| 386 self.assertTrue(isinstance(translated_exc, ResumableUploadAbortException)) |
OLD | NEW |