| Index: third_party/gsutil/gslib/tests/test_parallelism_framework.py
|
| diff --git a/third_party/gsutil/gslib/tests/test_parallelism_framework.py b/third_party/gsutil/gslib/tests/test_parallelism_framework.py
|
| index e895c1680a4c99fcfbaa116fc9f95cc3af06c53b..b3b9f46712567d9e3626a36c249014542f4da147 100644
|
| --- a/third_party/gsutil/gslib/tests/test_parallelism_framework.py
|
| +++ b/third_party/gsutil/gslib/tests/test_parallelism_framework.py
|
| @@ -24,7 +24,10 @@
|
| from __future__ import absolute_import
|
|
|
| import functools
|
| +import os
|
| import signal
|
| +import threading
|
| +import time
|
|
|
| from boto.storage_uri import BucketStorageUri
|
| from gslib import cs_api_map
|
| @@ -34,8 +37,8 @@ from gslib.command import DummyArgChecker
|
| import gslib.tests.testcase as testcase
|
| from gslib.tests.testcase.base import RequiresIsolation
|
| from gslib.tests.util import unittest
|
| +from gslib.util import CheckMultiprocessingAvailableAndInit
|
| from gslib.util import IS_WINDOWS
|
| -from gslib.util import MultiprocessingIsAvailable
|
|
|
|
|
| # Amount of time for an individual test to run before timing out. We need a
|
| @@ -74,6 +77,18 @@ def _ReturnOneValue(cls, args, thread_state=None):
|
| return 1
|
|
|
|
|
| +def _ReturnProcAndThreadId(cls, args, thread_state=None):
|
| + return (os.getpid(), threading.currentThread().ident)
|
| +
|
| +
|
| +def _SleepThenReturnProcAndThreadId(cls, args, thread_state=None):
|
| + # This can fail if the total time to spawn new processes and threads takes
|
| + # longer than 5 seconds, but if that occurs, then we have a performance
|
| + # problem that needs to be addressed.
|
| + time.sleep(5)
|
| + return _ReturnProcAndThreadId(cls, args, thread_state=thread_state)
|
| +
|
| +
|
| def _FailureFunc(cls, args, thread_state=None):
|
| raise CustomException('Failing on purpose.')
|
|
|
| @@ -214,7 +229,8 @@ class FakeCommand(Command):
|
| self.logger = CreateGsutilLogger('FakeCommand')
|
| self.parallel_operations = do_parallel
|
| self.failure_count = 0
|
| - self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
|
| + self.multiprocessing_is_available = (
|
| + CheckMultiprocessingAvailableAndInit().is_available)
|
| self.debug = 0
|
|
|
|
|
| @@ -272,6 +288,75 @@ class TestParallelismFramework(testcase.GsUtilUnitTestCase):
|
| self.assertEqual(len(args), len(results))
|
|
|
| @RequiresIsolation
|
| + def testNoTasksSingleProcessSingleThread(self):
|
| + self._TestApplyWithNoTasks(1, 1)
|
| +
|
| + @RequiresIsolation
|
| + def testNoTasksSingleProcessMultiThread(self):
|
| + self._TestApplyWithNoTasks(1, 3)
|
| +
|
| + @RequiresIsolation
|
| + @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| + def testNoTasksMultiProcessSingleThread(self):
|
| + self._TestApplyWithNoTasks(3, 1)
|
| +
|
| + @RequiresIsolation
|
| + @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| + def testNoTasksMultiProcessMultiThread(self):
|
| + self._TestApplyWithNoTasks(3, 3)
|
| +
|
| + @Timeout
|
| + def _TestApplyWithNoTasks(self, process_count, thread_count):
|
| + """Tests that calling Apply with no tasks releases locks/semaphores."""
|
| + empty_args = [()]
|
| +
|
| + for _ in range(process_count * thread_count + 1):
|
| + self._RunApply(_ReturnOneValue, empty_args, process_count, thread_count)
|
| +
|
| + # Ensure that work can still be performed.
|
| + self._TestBasicApply(process_count, thread_count)
|
| +
|
| + @RequiresIsolation
|
| + @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| + def testApplySaturatesMultiProcessSingleThread(self):
|
| + self._TestApplySaturatesAvailableProcessesAndThreads(3, 1)
|
| +
|
| + @RequiresIsolation
|
| + def testApplySaturatesSingleProcessMultiThread(self):
|
| + self._TestApplySaturatesAvailableProcessesAndThreads(1, 3)
|
| +
|
| + @RequiresIsolation
|
| + @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| + def testApplySaturatesMultiProcessMultiThread(self):
|
| + self._TestApplySaturatesAvailableProcessesAndThreads(3, 3)
|
| +
|
| + @RequiresIsolation
|
| + def _TestApplySaturatesAvailableProcessesAndThreads(self, process_count,
|
| + thread_count):
|
| + """Tests that created processes and threads evenly share tasks."""
|
| + calls_per_thread = 2
|
| + args = [()] * (process_count * thread_count * calls_per_thread)
|
| + expected_calls_per_thread = calls_per_thread
|
| +
|
| + if not self.command_class(True).multiprocessing_is_available:
|
| + # When multiprocessing is unavailable, only a single process is used.
|
| + # Calls should be evenly distributed across threads.
|
| + expected_calls_per_thread = calls_per_thread * process_count
|
| +
|
| + results = self._RunApply(_SleepThenReturnProcAndThreadId, args,
|
| + process_count, thread_count)
|
| + usage_dict = {} # (process_id, thread_id): number of tasks performed
|
| + for (process_id, thread_id) in results:
|
| + usage_dict[(process_id, thread_id)] = (
|
| + usage_dict.get((process_id, thread_id), 0) + 1)
|
| +
|
| + for (id_tuple, num_tasks_completed) in usage_dict.iteritems():
|
| + self.assertEqual(num_tasks_completed, expected_calls_per_thread,
|
| + 'Process %s thread %s completed %s tasks. Expected: %s' %
|
| + (id_tuple[0], id_tuple[1], num_tasks_completed,
|
| + expected_calls_per_thread))
|
| +
|
| + @RequiresIsolation
|
| def testIteratorFailureSingleProcessSingleThread(self):
|
| self._TestIteratorFailure(1, 1)
|
|
|
| @@ -353,12 +438,22 @@ class TestParallelismFramework(testcase.GsUtilUnitTestCase):
|
| expected_sum += len(arg)
|
| self.assertEqual(expected_sum, command_inst.arg_length_sum)
|
|
|
| - # Test that shared variables work when the iterator fails.
|
| - command_inst = self.command_class(True)
|
| - args = FailingIterator(10, [1, 3, 5])
|
| - self._RunApply(_ReturnOneValue, args, process_count, thread_count,
|
| - command_inst=command_inst, shared_attrs=['failure_count'])
|
| - self.assertEqual(3, command_inst.failure_count)
|
| + # Test that shared variables work when the iterator fails at the beginning,
|
| + # middle, and end.
|
| + for (failing_iterator, expected_failure_count) in (
|
| + (FailingIterator(5, [0]), 1),
|
| + (FailingIterator(10, [1, 3, 5]), 3),
|
| + (FailingIterator(5, [4]), 1)):
|
| + command_inst = self.command_class(True)
|
| + args = failing_iterator
|
| + self._RunApply(_ReturnOneValue, args, process_count, thread_count,
|
| + command_inst=command_inst, shared_attrs=['failure_count'])
|
| + self.assertEqual(
|
| + expected_failure_count, command_inst.failure_count,
|
| + msg='Failure count did not match. Expected: %s, actual: %s '
|
| + 'for failing iterator of size %s, failing indices %s' %
|
| + (expected_failure_count, command_inst.failure_count,
|
| + failing_iterator.size, failing_iterator.failure_indices))
|
|
|
| @RequiresIsolation
|
| def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
|
| @@ -564,11 +659,11 @@ class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
|
| """Tests parallelism framework works with multiprocessing module unavailable.
|
|
|
| Notably, this test has no way to override previous calls
|
| - to gslib.util.MultiprocessingIsAvailable to prevent the initialization of
|
| - all of the global variables in command.py, so this still behaves slightly
|
| - differently than the behavior one would see on a machine where the
|
| - multiprocessing functionality is actually not available (in particular, it
|
| - will not catch the case where a global variable that is not available for
|
| - the sequential path is referenced before initialization).
|
| + to gslib.util.CheckMultiprocessingAvailableAndInit to prevent the
|
| + initialization of all of the global variables in command.py, so this still
|
| + behaves slightly differently than the behavior one would see on a machine
|
| + where the multiprocessing functionality is actually not available (in
|
| + particular, it will not catch the case where a global variable that is not
|
| + available for the sequential path is referenced before initialization).
|
| """
|
| command_class = FakeCommandWithoutMultiprocessingModule
|
|
|