OLD | NEW |
1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
2 # Copyright 2013 Google Inc. All Rights Reserved. | 2 # Copyright 2013 Google Inc. All Rights Reserved. |
3 # | 3 # |
4 # Permission is hereby granted, free of charge, to any person obtaining a | 4 # Permission is hereby granted, free of charge, to any person obtaining a |
5 # copy of this software and associated documentation files (the | 5 # copy of this software and associated documentation files (the |
6 # "Software"), to deal in the Software without restriction, including | 6 # "Software"), to deal in the Software without restriction, including |
7 # without limitation the rights to use, copy, modify, merge, publish, dis- | 7 # without limitation the rights to use, copy, modify, merge, publish, dis- |
8 # tribute, sublicense, and/or sell copies of the Software, and to permit | 8 # tribute, sublicense, and/or sell copies of the Software, and to permit |
9 # persons to whom the Software is furnished to do so, subject to the fol- | 9 # persons to whom the Software is furnished to do so, subject to the fol- |
10 # lowing conditions: | 10 # lowing conditions: |
11 # | 11 # |
12 # The above copyright notice and this permission notice shall be included | 12 # The above copyright notice and this permission notice shall be included |
13 # in all copies or substantial portions of the Software. | 13 # in all copies or substantial portions of the Software. |
14 # | 14 # |
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
21 # IN THE SOFTWARE. | 21 # IN THE SOFTWARE. |
22 """Unit tests for gsutil parallelism framework.""" | 22 """Unit tests for gsutil parallelism framework.""" |
23 | 23 |
24 from __future__ import absolute_import | 24 from __future__ import absolute_import |
25 | 25 |
26 import functools | 26 import functools |
| 27 import os |
27 import signal | 28 import signal |
| 29 import threading |
| 30 import time |
28 | 31 |
29 from boto.storage_uri import BucketStorageUri | 32 from boto.storage_uri import BucketStorageUri |
30 from gslib import cs_api_map | 33 from gslib import cs_api_map |
31 from gslib.command import Command | 34 from gslib.command import Command |
32 from gslib.command import CreateGsutilLogger | 35 from gslib.command import CreateGsutilLogger |
33 from gslib.command import DummyArgChecker | 36 from gslib.command import DummyArgChecker |
34 import gslib.tests.testcase as testcase | 37 import gslib.tests.testcase as testcase |
35 from gslib.tests.testcase.base import RequiresIsolation | 38 from gslib.tests.testcase.base import RequiresIsolation |
36 from gslib.tests.util import unittest | 39 from gslib.tests.util import unittest |
| 40 from gslib.util import CheckMultiprocessingAvailableAndInit |
37 from gslib.util import IS_WINDOWS | 41 from gslib.util import IS_WINDOWS |
38 from gslib.util import MultiprocessingIsAvailable | |
39 | 42 |
40 | 43 |
41 # Amount of time for an individual test to run before timing out. We need a | 44 # Amount of time for an individual test to run before timing out. We need a |
42 # reasonably high value since if many tests are running in parallel, an | 45 # reasonably high value since if many tests are running in parallel, an |
43 # individual test may take a while to complete. | 46 # individual test may take a while to complete. |
44 _TEST_TIMEOUT_SECONDS = 120 | 47 _TEST_TIMEOUT_SECONDS = 120 |
45 | 48 |
46 | 49 |
47 def Timeout(func): | 50 def Timeout(func): |
48 """Decorator used to provide a timeout for functions.""" | 51 """Decorator used to provide a timeout for functions.""" |
(...skipping 18 matching lines...) Expand all Loading... |
67 class CustomException(Exception): | 70 class CustomException(Exception): |
68 | 71 |
69 def __init__(self, exception_str): | 72 def __init__(self, exception_str): |
70 super(CustomException, self).__init__(exception_str) | 73 super(CustomException, self).__init__(exception_str) |
71 | 74 |
72 | 75 |
73 def _ReturnOneValue(cls, args, thread_state=None): | 76 def _ReturnOneValue(cls, args, thread_state=None): |
74 return 1 | 77 return 1 |
75 | 78 |
76 | 79 |
| 80 def _ReturnProcAndThreadId(cls, args, thread_state=None): |
| 81 return (os.getpid(), threading.currentThread().ident) |
| 82 |
| 83 |
| 84 def _SleepThenReturnProcAndThreadId(cls, args, thread_state=None): |
| 85 # This can fail if the total time to spawn new processes and threads takes |
| 86 # longer than 5 seconds, but if that occurs, then we have a performance |
| 87 # problem that needs to be addressed. |
| 88 time.sleep(5) |
| 89 return _ReturnProcAndThreadId(cls, args, thread_state=thread_state) |
| 90 |
| 91 |
77 def _FailureFunc(cls, args, thread_state=None): | 92 def _FailureFunc(cls, args, thread_state=None): |
78 raise CustomException('Failing on purpose.') | 93 raise CustomException('Failing on purpose.') |
79 | 94 |
80 | 95 |
81 def _FailingExceptionHandler(cls, e): | 96 def _FailingExceptionHandler(cls, e): |
82 cls.failure_count += 1 | 97 cls.failure_count += 1 |
83 raise CustomException('Exception handler failing on purpose.') | 98 raise CustomException('Exception handler failing on purpose.') |
84 | 99 |
85 | 100 |
86 def _ExceptionHandler(cls, e): | 101 def _ExceptionHandler(cls, e): |
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
207 } | 222 } |
208 default_map = { | 223 default_map = { |
209 'gs': 'JSON', | 224 'gs': 'JSON', |
210 's3': 'XML' | 225 's3': 'XML' |
211 } | 226 } |
212 self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap( | 227 self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap( |
213 cs_api_map.GsutilApiClassMapFactory, support_map, default_map) | 228 cs_api_map.GsutilApiClassMapFactory, support_map, default_map) |
214 self.logger = CreateGsutilLogger('FakeCommand') | 229 self.logger = CreateGsutilLogger('FakeCommand') |
215 self.parallel_operations = do_parallel | 230 self.parallel_operations = do_parallel |
216 self.failure_count = 0 | 231 self.failure_count = 0 |
217 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] | 232 self.multiprocessing_is_available = ( |
| 233 CheckMultiprocessingAvailableAndInit().is_available) |
218 self.debug = 0 | 234 self.debug = 0 |
219 | 235 |
220 | 236 |
221 class FakeCommandWithoutMultiprocessingModule(FakeCommand): | 237 class FakeCommandWithoutMultiprocessingModule(FakeCommand): |
222 | 238 |
223 def __init__(self, do_parallel): | 239 def __init__(self, do_parallel): |
224 super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel) | 240 super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel) |
225 self.multiprocessing_is_available = False | 241 self.multiprocessing_is_available = False |
226 | 242 |
227 | 243 |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
265 self._TestBasicApply(3, 3) | 281 self._TestBasicApply(3, 3) |
266 | 282 |
267 @Timeout | 283 @Timeout |
268 def _TestBasicApply(self, process_count, thread_count): | 284 def _TestBasicApply(self, process_count, thread_count): |
269 args = [()] * (17 * process_count * thread_count + 1) | 285 args = [()] * (17 * process_count * thread_count + 1) |
270 | 286 |
271 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) | 287 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
272 self.assertEqual(len(args), len(results)) | 288 self.assertEqual(len(args), len(results)) |
273 | 289 |
274 @RequiresIsolation | 290 @RequiresIsolation |
| 291 def testNoTasksSingleProcessSingleThread(self): |
| 292 self._TestApplyWithNoTasks(1, 1) |
| 293 |
| 294 @RequiresIsolation |
| 295 def testNoTasksSingleProcessMultiThread(self): |
| 296 self._TestApplyWithNoTasks(1, 3) |
| 297 |
| 298 @RequiresIsolation |
| 299 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 300 def testNoTasksMultiProcessSingleThread(self): |
| 301 self._TestApplyWithNoTasks(3, 1) |
| 302 |
| 303 @RequiresIsolation |
| 304 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 305 def testNoTasksMultiProcessMultiThread(self): |
| 306 self._TestApplyWithNoTasks(3, 3) |
| 307 |
| 308 @Timeout |
| 309 def _TestApplyWithNoTasks(self, process_count, thread_count): |
| 310 """Tests that calling Apply with no tasks releases locks/semaphores.""" |
| 311 empty_args = [()] |
| 312 |
| 313 for _ in range(process_count * thread_count + 1): |
| 314 self._RunApply(_ReturnOneValue, empty_args, process_count, thread_count) |
| 315 |
| 316 # Ensure that work can still be performed. |
| 317 self._TestBasicApply(process_count, thread_count) |
| 318 |
| 319 @RequiresIsolation |
| 320 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 321 def testApplySaturatesMultiProcessSingleThread(self): |
| 322 self._TestApplySaturatesAvailableProcessesAndThreads(3, 1) |
| 323 |
| 324 @RequiresIsolation |
| 325 def testApplySaturatesSingleProcessMultiThread(self): |
| 326 self._TestApplySaturatesAvailableProcessesAndThreads(1, 3) |
| 327 |
| 328 @RequiresIsolation |
| 329 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 330 def testApplySaturatesMultiProcessMultiThread(self): |
| 331 self._TestApplySaturatesAvailableProcessesAndThreads(3, 3) |
| 332 |
| 333 @RequiresIsolation |
| 334 def _TestApplySaturatesAvailableProcessesAndThreads(self, process_count, |
| 335 thread_count): |
| 336 """Tests that created processes and threads evenly share tasks.""" |
| 337 calls_per_thread = 2 |
| 338 args = [()] * (process_count * thread_count * calls_per_thread) |
| 339 expected_calls_per_thread = calls_per_thread |
| 340 |
| 341 if not self.command_class(True).multiprocessing_is_available: |
| 342 # When multiprocessing is unavailable, only a single process is used. |
| 343 # Calls should be evenly distributed across threads. |
| 344 expected_calls_per_thread = calls_per_thread * process_count |
| 345 |
| 346 results = self._RunApply(_SleepThenReturnProcAndThreadId, args, |
| 347 process_count, thread_count) |
| 348 usage_dict = {} # (process_id, thread_id): number of tasks performed |
| 349 for (process_id, thread_id) in results: |
| 350 usage_dict[(process_id, thread_id)] = ( |
| 351 usage_dict.get((process_id, thread_id), 0) + 1) |
| 352 |
| 353 for (id_tuple, num_tasks_completed) in usage_dict.iteritems(): |
| 354 self.assertEqual(num_tasks_completed, expected_calls_per_thread, |
| 355 'Process %s thread %s completed %s tasks. Expected: %s' % |
| 356 (id_tuple[0], id_tuple[1], num_tasks_completed, |
| 357 expected_calls_per_thread)) |
| 358 |
| 359 @RequiresIsolation |
275 def testIteratorFailureSingleProcessSingleThread(self): | 360 def testIteratorFailureSingleProcessSingleThread(self): |
276 self._TestIteratorFailure(1, 1) | 361 self._TestIteratorFailure(1, 1) |
277 | 362 |
278 @RequiresIsolation | 363 @RequiresIsolation |
279 def testIteratorFailureSingleProcessMultiThread(self): | 364 def testIteratorFailureSingleProcessMultiThread(self): |
280 self._TestIteratorFailure(1, 3) | 365 self._TestIteratorFailure(1, 3) |
281 | 366 |
282 @RequiresIsolation | 367 @RequiresIsolation |
283 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 368 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
284 def testIteratorFailureMultiProcessSingleThread(self): | 369 def testIteratorFailureMultiProcessSingleThread(self): |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
346 command_inst.arg_length_sum = 19 | 431 command_inst.arg_length_sum = 19 |
347 args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd'] | 432 args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd'] |
348 self._RunApply(_IncrementByLength, args, process_count, | 433 self._RunApply(_IncrementByLength, args, process_count, |
349 thread_count, command_inst=command_inst, | 434 thread_count, command_inst=command_inst, |
350 shared_attrs=['arg_length_sum']) | 435 shared_attrs=['arg_length_sum']) |
351 expected_sum = 19 | 436 expected_sum = 19 |
352 for arg in args: | 437 for arg in args: |
353 expected_sum += len(arg) | 438 expected_sum += len(arg) |
354 self.assertEqual(expected_sum, command_inst.arg_length_sum) | 439 self.assertEqual(expected_sum, command_inst.arg_length_sum) |
355 | 440 |
356 # Test that shared variables work when the iterator fails. | 441 # Test that shared variables work when the iterator fails at the beginning, |
357 command_inst = self.command_class(True) | 442 # middle, and end. |
358 args = FailingIterator(10, [1, 3, 5]) | 443 for (failing_iterator, expected_failure_count) in ( |
359 self._RunApply(_ReturnOneValue, args, process_count, thread_count, | 444 (FailingIterator(5, [0]), 1), |
360 command_inst=command_inst, shared_attrs=['failure_count']) | 445 (FailingIterator(10, [1, 3, 5]), 3), |
361 self.assertEqual(3, command_inst.failure_count) | 446 (FailingIterator(5, [4]), 1)): |
| 447 command_inst = self.command_class(True) |
| 448 args = failing_iterator |
| 449 self._RunApply(_ReturnOneValue, args, process_count, thread_count, |
| 450 command_inst=command_inst, shared_attrs=['failure_count']) |
| 451 self.assertEqual( |
| 452 expected_failure_count, command_inst.failure_count, |
| 453 msg='Failure count did not match. Expected: %s, actual: %s ' |
| 454 'for failing iterator of size %s, failing indices %s' % |
| 455 (expected_failure_count, command_inst.failure_count, |
| 456 failing_iterator.size, failing_iterator.failure_indices)) |
362 | 457 |
363 @RequiresIsolation | 458 @RequiresIsolation |
364 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self): | 459 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self): |
365 self._TestThreadsSurviveExceptionsInFunc(1, 1) | 460 self._TestThreadsSurviveExceptionsInFunc(1, 1) |
366 | 461 |
367 @RequiresIsolation | 462 @RequiresIsolation |
368 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self): | 463 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self): |
369 self._TestThreadsSurviveExceptionsInFunc(1, 3) | 464 self._TestThreadsSurviveExceptionsInFunc(1, 3) |
370 | 465 |
371 @RequiresIsolation | 466 @RequiresIsolation |
(...skipping 185 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
557 args = [2 * x for x in args] | 652 args = [2 * x for x in args] |
558 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, | 653 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, |
559 arg_checker=_SkipEvenNumbersArgChecker) | 654 arg_checker=_SkipEvenNumbersArgChecker) |
560 self.assertEqual(0, len(results)) | 655 self.assertEqual(0, len(results)) |
561 | 656 |
562 | 657 |
563 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework): | 658 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework): |
564 """Tests parallelism framework works with multiprocessing module unavailable. | 659 """Tests parallelism framework works with multiprocessing module unavailable. |
565 | 660 |
566 Notably, this test has no way to override previous calls | 661 Notably, this test has no way to override previous calls |
567 to gslib.util.MultiprocessingIsAvailable to prevent the initialization of | 662 to gslib.util.CheckMultiprocessingAvailableAndInit to prevent the |
568 all of the global variables in command.py, so this still behaves slightly | 663 initialization of all of the global variables in command.py, so this still |
569 differently than the behavior one would see on a machine where the | 664 behaves slightly differently than the behavior one would see on a machine |
570 multiprocessing functionality is actually not available (in particular, it | 665 where the multiprocessing functionality is actually not available (in |
571 will not catch the case where a global variable that is not available for | 666 particular, it will not catch the case where a global variable that is not |
572 the sequential path is referenced before initialization). | 667 available for the sequential path is referenced before initialization). |
573 """ | 668 """ |
574 command_class = FakeCommandWithoutMultiprocessingModule | 669 command_class = FakeCommandWithoutMultiprocessingModule |
OLD | NEW |