OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2013 Google Inc. All Rights Reserved. | |
3 # | |
4 # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 # you may not use this file except in compliance with the License. | |
6 # You may obtain a copy of the License at | |
7 # | |
8 # http://www.apache.org/licenses/LICENSE-2.0 | |
9 # | |
10 # Unless required by applicable law or agreed to in writing, software | |
11 # distributed under the License is distributed on an "AS IS" BASIS, | |
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 # See the License for the specific language governing permissions and | |
14 # limitations under the License. | |
15 """Unit tests for parallel upload functions in copy_helper.""" | |
16 | |
17 from apitools.base.py import exceptions as apitools_exceptions | |
18 | |
19 from util import GSMockBucketStorageUri | |
20 | |
21 from gslib.cloud_api import ResumableUploadAbortException | |
22 from gslib.cloud_api import ResumableUploadException | |
23 from gslib.cloud_api import ResumableUploadStartOverException | |
24 from gslib.cloud_api import ServiceException | |
25 from gslib.command import CreateGsutilLogger | |
26 from gslib.copy_helper import _AppendComponentTrackerToParallelUploadTrackerFile | |
27 from gslib.copy_helper import _CreateParallelUploadTrackerFile | |
28 from gslib.copy_helper import _GetPartitionInfo | |
29 from gslib.copy_helper import _ParseParallelUploadTrackerFile | |
30 from gslib.copy_helper import FilterExistingComponents | |
31 from gslib.copy_helper import ObjectFromTracker | |
32 from gslib.copy_helper import PerformParallelUploadFileToObjectArgs | |
33 from gslib.gcs_json_api import GcsJsonApi | |
34 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents | |
35 from gslib.storage_url import StorageUrlFromString | |
36 from gslib.tests.mock_cloud_api import MockCloudApi | |
37 from gslib.tests.testcase.unit_testcase import GsUtilUnitTestCase | |
38 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_m
essages | |
39 from gslib.util import CreateLock | |
40 | |
41 | |
42 class TestCpFuncs(GsUtilUnitTestCase): | |
43 """Unit tests for parallel upload functions in cp command.""" | |
44 | |
45 def test_GetPartitionInfo(self): | |
46 """Tests the _GetPartitionInfo function.""" | |
47 # Simplest case - threshold divides file_size. | |
48 (num_components, component_size) = _GetPartitionInfo(300, 200, 10) | |
49 self.assertEqual(30, num_components) | |
50 self.assertEqual(10, component_size) | |
51 | |
52 # Threshold = 1 (mod file_size). | |
53 (num_components, component_size) = _GetPartitionInfo(301, 200, 10) | |
54 self.assertEqual(31, num_components) | |
55 self.assertEqual(10, component_size) | |
56 | |
57 # Threshold = -1 (mod file_size). | |
58 (num_components, component_size) = _GetPartitionInfo(299, 200, 10) | |
59 self.assertEqual(30, num_components) | |
60 self.assertEqual(10, component_size) | |
61 | |
62 # Too many components needed. | |
63 (num_components, component_size) = _GetPartitionInfo(301, 2, 10) | |
64 self.assertEqual(2, num_components) | |
65 self.assertEqual(151, component_size) | |
66 | |
67 # Test num_components with huge numbers. | |
68 (num_components, component_size) = _GetPartitionInfo((10 ** 150) + 1, | |
69 10 ** 200, | |
70 10) | |
71 self.assertEqual((10 ** 149) + 1, num_components) | |
72 self.assertEqual(10, component_size) | |
73 | |
74 # Test component_size with huge numbers. | |
75 (num_components, component_size) = _GetPartitionInfo((10 ** 150) + 1, | |
76 10, | |
77 10) | |
78 self.assertEqual(10, num_components) | |
79 self.assertEqual((10 ** 149) + 1, component_size) | |
80 | |
81 # Test component_size > file_size (make sure we get at least two components. | |
82 (num_components, component_size) = _GetPartitionInfo(100, 500, 51) | |
83 self.assertEquals(2, num_components) | |
84 self.assertEqual(50, component_size) | |
85 | |
86 def test_ParseParallelUploadTrackerFile(self): | |
87 """Tests the _ParseParallelUploadTrackerFile function.""" | |
88 tracker_file_lock = CreateLock() | |
89 random_prefix = '123' | |
90 objects = ['obj1', '42', 'obj2', '314159'] | |
91 contents = '\n'.join([random_prefix] + objects) | |
92 fpath = self.CreateTempFile(file_name='foo', contents=contents) | |
93 expected_objects = [ObjectFromTracker(objects[2 * i], objects[2 * i + 1]) | |
94 for i in range(0, len(objects) / 2)] | |
95 (actual_prefix, actual_objects) = _ParseParallelUploadTrackerFile( | |
96 fpath, tracker_file_lock) | |
97 self.assertEqual(random_prefix, actual_prefix) | |
98 self.assertEqual(expected_objects, actual_objects) | |
99 | |
100 def test_ParseEmptyParallelUploadTrackerFile(self): | |
101 """Tests _ParseParallelUploadTrackerFile with an empty tracker file.""" | |
102 tracker_file_lock = CreateLock() | |
103 fpath = self.CreateTempFile(file_name='foo', contents='') | |
104 expected_objects = [] | |
105 (actual_prefix, actual_objects) = _ParseParallelUploadTrackerFile( | |
106 fpath, tracker_file_lock) | |
107 self.assertEqual(actual_objects, expected_objects) | |
108 self.assertIsNotNone(actual_prefix) | |
109 | |
110 def test_CreateParallelUploadTrackerFile(self): | |
111 """Tests the _CreateParallelUploadTrackerFile function.""" | |
112 tracker_file = self.CreateTempFile(file_name='foo', contents='asdf') | |
113 tracker_file_lock = CreateLock() | |
114 random_prefix = '123' | |
115 objects = ['obj1', '42', 'obj2', '314159'] | |
116 expected_contents = [random_prefix] + objects | |
117 objects = [ObjectFromTracker(objects[2 * i], objects[2 * i + 1]) | |
118 for i in range(0, len(objects) / 2)] | |
119 _CreateParallelUploadTrackerFile(tracker_file, random_prefix, objects, | |
120 tracker_file_lock) | |
121 with open(tracker_file, 'rb') as f: | |
122 lines = f.read().splitlines() | |
123 self.assertEqual(expected_contents, lines) | |
124 | |
125 def test_AppendComponentTrackerToParallelUploadTrackerFile(self): | |
126 """Tests the _CreateParallelUploadTrackerFile function with append.""" | |
127 tracker_file = self.CreateTempFile(file_name='foo', contents='asdf') | |
128 tracker_file_lock = CreateLock() | |
129 random_prefix = '123' | |
130 objects = ['obj1', '42', 'obj2', '314159'] | |
131 expected_contents = [random_prefix] + objects | |
132 objects = [ObjectFromTracker(objects[2 * i], objects[2 * i + 1]) | |
133 for i in range(0, len(objects) / 2)] | |
134 _CreateParallelUploadTrackerFile(tracker_file, random_prefix, objects, | |
135 tracker_file_lock) | |
136 | |
137 new_object = ['obj2', '1234'] | |
138 expected_contents += new_object | |
139 new_object = ObjectFromTracker(new_object[0], new_object[1]) | |
140 _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, new_object, | |
141 tracker_file_lock) | |
142 with open(tracker_file, 'rb') as f: | |
143 lines = f.read().splitlines() | |
144 self.assertEqual(expected_contents, lines) | |
145 | |
146 def test_FilterExistingComponentsNonVersioned(self): | |
147 """Tests upload with a variety of component states.""" | |
148 mock_api = MockCloudApi() | |
149 bucket_name = self.MakeTempName('bucket') | |
150 tracker_file = self.CreateTempFile(file_name='foo', contents='asdf') | |
151 tracker_file_lock = CreateLock() | |
152 | |
153 # dst_obj_metadata used for passing content-type. | |
154 empty_object = apitools_messages.Object() | |
155 | |
156 # Already uploaded, contents still match, component still used. | |
157 fpath_uploaded_correctly = self.CreateTempFile(file_name='foo1', | |
158 contents='1') | |
159 fpath_uploaded_correctly_url = StorageUrlFromString( | |
160 str(fpath_uploaded_correctly)) | |
161 object_uploaded_correctly_url = StorageUrlFromString('%s://%s/%s' % ( | |
162 self.default_provider, bucket_name, | |
163 fpath_uploaded_correctly)) | |
164 with open(fpath_uploaded_correctly) as f_in: | |
165 fpath_uploaded_correctly_md5 = CalculateB64EncodedMd5FromContents(f_in) | |
166 mock_api.MockCreateObjectWithMetadata( | |
167 apitools_messages.Object(bucket=bucket_name, | |
168 name=fpath_uploaded_correctly, | |
169 md5Hash=fpath_uploaded_correctly_md5), | |
170 contents='1') | |
171 | |
172 args_uploaded_correctly = PerformParallelUploadFileToObjectArgs( | |
173 fpath_uploaded_correctly, 0, 1, fpath_uploaded_correctly_url, | |
174 object_uploaded_correctly_url, '', empty_object, tracker_file, | |
175 tracker_file_lock) | |
176 | |
177 # Not yet uploaded, but needed. | |
178 fpath_not_uploaded = self.CreateTempFile(file_name='foo2', contents='2') | |
179 fpath_not_uploaded_url = StorageUrlFromString(str(fpath_not_uploaded)) | |
180 object_not_uploaded_url = StorageUrlFromString('%s://%s/%s' % ( | |
181 self.default_provider, bucket_name, fpath_not_uploaded)) | |
182 args_not_uploaded = PerformParallelUploadFileToObjectArgs( | |
183 fpath_not_uploaded, 0, 1, fpath_not_uploaded_url, | |
184 object_not_uploaded_url, '', empty_object, tracker_file, | |
185 tracker_file_lock) | |
186 | |
187 # Already uploaded, but contents no longer match. Even though the contents | |
188 # differ, we don't delete this since the bucket is not versioned and it | |
189 # will be overwritten anyway. | |
190 fpath_wrong_contents = self.CreateTempFile(file_name='foo4', contents='4') | |
191 fpath_wrong_contents_url = StorageUrlFromString(str(fpath_wrong_contents)) | |
192 object_wrong_contents_url = StorageUrlFromString('%s://%s/%s' % ( | |
193 self.default_provider, bucket_name, fpath_wrong_contents)) | |
194 with open(self.CreateTempFile(contents='_')) as f_in: | |
195 fpath_wrong_contents_md5 = CalculateB64EncodedMd5FromContents(f_in) | |
196 mock_api.MockCreateObjectWithMetadata( | |
197 apitools_messages.Object(bucket=bucket_name, | |
198 name=fpath_wrong_contents, | |
199 md5Hash=fpath_wrong_contents_md5), | |
200 contents='1') | |
201 | |
202 args_wrong_contents = PerformParallelUploadFileToObjectArgs( | |
203 fpath_wrong_contents, 0, 1, fpath_wrong_contents_url, | |
204 object_wrong_contents_url, '', empty_object, tracker_file, | |
205 tracker_file_lock) | |
206 | |
207 # Exists in tracker file, but component object no longer exists. | |
208 fpath_remote_deleted = self.CreateTempFile(file_name='foo5', contents='5') | |
209 fpath_remote_deleted_url = StorageUrlFromString( | |
210 str(fpath_remote_deleted)) | |
211 args_remote_deleted = PerformParallelUploadFileToObjectArgs( | |
212 fpath_remote_deleted, 0, 1, fpath_remote_deleted_url, '', '', | |
213 empty_object, tracker_file, tracker_file_lock) | |
214 | |
215 # Exists in tracker file and already uploaded, but no longer needed. | |
216 fpath_no_longer_used = self.CreateTempFile(file_name='foo6', contents='6') | |
217 with open(fpath_no_longer_used) as f_in: | |
218 file_md5 = CalculateB64EncodedMd5FromContents(f_in) | |
219 mock_api.MockCreateObjectWithMetadata( | |
220 apitools_messages.Object(bucket=bucket_name, | |
221 name='foo6', md5Hash=file_md5), contents='6') | |
222 | |
223 dst_args = {fpath_uploaded_correctly: args_uploaded_correctly, | |
224 fpath_not_uploaded: args_not_uploaded, | |
225 fpath_wrong_contents: args_wrong_contents, | |
226 fpath_remote_deleted: args_remote_deleted} | |
227 | |
228 existing_components = [ObjectFromTracker(fpath_uploaded_correctly, ''), | |
229 ObjectFromTracker(fpath_wrong_contents, ''), | |
230 ObjectFromTracker(fpath_remote_deleted, ''), | |
231 ObjectFromTracker(fpath_no_longer_used, '')] | |
232 | |
233 bucket_url = StorageUrlFromString('%s://%s' % (self.default_provider, | |
234 bucket_name)) | |
235 | |
236 (components_to_upload, uploaded_components, existing_objects_to_delete) = ( | |
237 FilterExistingComponents(dst_args, existing_components, | |
238 bucket_url, mock_api)) | |
239 | |
240 for arg in [args_not_uploaded, args_wrong_contents, args_remote_deleted]: | |
241 self.assertTrue(arg in components_to_upload) | |
242 self.assertEqual(1, len(uploaded_components)) | |
243 self.assertEqual(args_uploaded_correctly.dst_url.url_string, | |
244 uploaded_components[0].url_string) | |
245 self.assertEqual(1, len(existing_objects_to_delete)) | |
246 no_longer_used_url = StorageUrlFromString('%s://%s/%s' % ( | |
247 self.default_provider, bucket_name, fpath_no_longer_used)) | |
248 self.assertEqual(no_longer_used_url.url_string, | |
249 existing_objects_to_delete[0].url_string) | |
250 | |
251 def test_FilterExistingComponentsVersioned(self): | |
252 """Tests upload with versionined parallel components.""" | |
253 | |
254 mock_api = MockCloudApi() | |
255 bucket_name = self.MakeTempName('bucket') | |
256 mock_api.MockCreateVersionedBucket(bucket_name) | |
257 | |
258 # dst_obj_metadata used for passing content-type. | |
259 empty_object = apitools_messages.Object() | |
260 | |
261 tracker_file = self.CreateTempFile(file_name='foo', contents='asdf') | |
262 tracker_file_lock = CreateLock() | |
263 | |
264 # Already uploaded, contents still match, component still used. | |
265 fpath_uploaded_correctly = self.CreateTempFile(file_name='foo1', | |
266 contents='1') | |
267 fpath_uploaded_correctly_url = StorageUrlFromString( | |
268 str(fpath_uploaded_correctly)) | |
269 with open(fpath_uploaded_correctly) as f_in: | |
270 fpath_uploaded_correctly_md5 = CalculateB64EncodedMd5FromContents(f_in) | |
271 object_uploaded_correctly = mock_api.MockCreateObjectWithMetadata( | |
272 apitools_messages.Object(bucket=bucket_name, | |
273 name=fpath_uploaded_correctly, | |
274 md5Hash=fpath_uploaded_correctly_md5), | |
275 contents='1') | |
276 object_uploaded_correctly_url = StorageUrlFromString('%s://%s/%s#%s' % ( | |
277 self.default_provider, bucket_name, | |
278 fpath_uploaded_correctly, object_uploaded_correctly.generation)) | |
279 args_uploaded_correctly = PerformParallelUploadFileToObjectArgs( | |
280 fpath_uploaded_correctly, 0, 1, fpath_uploaded_correctly_url, | |
281 object_uploaded_correctly_url, object_uploaded_correctly.generation, | |
282 empty_object, tracker_file, tracker_file_lock) | |
283 | |
284 # Duplicate object name in tracker file, but uploaded correctly. | |
285 fpath_duplicate = fpath_uploaded_correctly | |
286 fpath_duplicate_url = StorageUrlFromString(str(fpath_duplicate)) | |
287 duplicate_uploaded_correctly = mock_api.MockCreateObjectWithMetadata( | |
288 apitools_messages.Object(bucket=bucket_name, | |
289 name=fpath_duplicate, | |
290 md5Hash=fpath_uploaded_correctly_md5), | |
291 contents='1') | |
292 duplicate_uploaded_correctly_url = StorageUrlFromString('%s://%s/%s#%s' % ( | |
293 self.default_provider, bucket_name, | |
294 fpath_uploaded_correctly, duplicate_uploaded_correctly.generation)) | |
295 args_duplicate = PerformParallelUploadFileToObjectArgs( | |
296 fpath_duplicate, 0, 1, fpath_duplicate_url, | |
297 duplicate_uploaded_correctly_url, | |
298 duplicate_uploaded_correctly.generation, empty_object, tracker_file, | |
299 tracker_file_lock) | |
300 | |
301 # Already uploaded, but contents no longer match. | |
302 fpath_wrong_contents = self.CreateTempFile(file_name='foo4', contents='4') | |
303 fpath_wrong_contents_url = StorageUrlFromString(str(fpath_wrong_contents)) | |
304 with open(self.CreateTempFile(contents='_')) as f_in: | |
305 fpath_wrong_contents_md5 = CalculateB64EncodedMd5FromContents(f_in) | |
306 object_wrong_contents = mock_api.MockCreateObjectWithMetadata( | |
307 apitools_messages.Object(bucket=bucket_name, | |
308 name=fpath_wrong_contents, | |
309 md5Hash=fpath_wrong_contents_md5), | |
310 contents='_') | |
311 wrong_contents_url = StorageUrlFromString('%s://%s/%s#%s' % ( | |
312 self.default_provider, bucket_name, | |
313 fpath_wrong_contents, object_wrong_contents.generation)) | |
314 args_wrong_contents = PerformParallelUploadFileToObjectArgs( | |
315 fpath_wrong_contents, 0, 1, fpath_wrong_contents_url, | |
316 wrong_contents_url, '', empty_object, tracker_file, | |
317 tracker_file_lock) | |
318 | |
319 dst_args = {fpath_uploaded_correctly: args_uploaded_correctly, | |
320 fpath_wrong_contents: args_wrong_contents} | |
321 | |
322 existing_components = [ | |
323 ObjectFromTracker(fpath_uploaded_correctly, | |
324 object_uploaded_correctly_url.generation), | |
325 ObjectFromTracker(fpath_duplicate, | |
326 duplicate_uploaded_correctly_url.generation), | |
327 ObjectFromTracker(fpath_wrong_contents, | |
328 wrong_contents_url.generation)] | |
329 | |
330 bucket_url = StorageUrlFromString('%s://%s' % (self.default_provider, | |
331 bucket_name)) | |
332 | |
333 (components_to_upload, uploaded_components, existing_objects_to_delete) = ( | |
334 FilterExistingComponents(dst_args, existing_components, | |
335 bucket_url, mock_api)) | |
336 | |
337 self.assertEqual([args_wrong_contents], components_to_upload) | |
338 self.assertEqual(args_uploaded_correctly.dst_url.url_string, | |
339 uploaded_components[0].url_string) | |
340 expected_to_delete = [(args_wrong_contents.dst_url.object_name, | |
341 args_wrong_contents.dst_url.generation), | |
342 (args_duplicate.dst_url.object_name, | |
343 args_duplicate.dst_url.generation)] | |
344 for uri in existing_objects_to_delete: | |
345 self.assertTrue((uri.object_name, uri.generation) in expected_to_delete) | |
346 self.assertEqual(len(expected_to_delete), len(existing_objects_to_delete)) | |
347 | |
348 # pylint: disable=protected-access | |
349 def test_TranslateApitoolsResumableUploadException(self): | |
350 """Tests that _TranslateApitoolsResumableUploadException works correctly.""" | |
351 gsutil_api = GcsJsonApi( | |
352 GSMockBucketStorageUri, | |
353 CreateGsutilLogger('copy_test')) | |
354 | |
355 gsutil_api.http.disable_ssl_certificate_validation = True | |
356 exc = apitools_exceptions.HttpError({'status': 503}, None, None) | |
357 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) | |
358 self.assertTrue(isinstance(translated_exc, ServiceException)) | |
359 | |
360 gsutil_api.http.disable_ssl_certificate_validation = False | |
361 exc = apitools_exceptions.HttpError({'status': 503}, None, None) | |
362 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) | |
363 self.assertTrue(isinstance(translated_exc, ResumableUploadException)) | |
364 | |
365 gsutil_api.http.disable_ssl_certificate_validation = False | |
366 exc = apitools_exceptions.HttpError({'status': 429}, None, None) | |
367 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) | |
368 self.assertTrue(isinstance(translated_exc, ResumableUploadException)) | |
369 | |
370 exc = apitools_exceptions.HttpError({'status': 410}, None, None) | |
371 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) | |
372 self.assertTrue(isinstance(translated_exc, | |
373 ResumableUploadStartOverException)) | |
374 | |
375 exc = apitools_exceptions.HttpError({'status': 404}, None, None) | |
376 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) | |
377 self.assertTrue(isinstance(translated_exc, | |
378 ResumableUploadStartOverException)) | |
379 | |
380 exc = apitools_exceptions.HttpError({'status': 401}, None, None) | |
381 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) | |
382 self.assertTrue(isinstance(translated_exc, ResumableUploadAbortException)) | |
383 | |
384 exc = apitools_exceptions.TransferError('Aborting transfer') | |
385 translated_exc = gsutil_api._TranslateApitoolsResumableUploadException(exc) | |
386 self.assertTrue(isinstance(translated_exc, ResumableUploadAbortException)) | |
OLD | NEW |