Index: tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py |
diff --git a/tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py b/tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py |
index e895c1680a4c99fcfbaa116fc9f95cc3af06c53b..b3b9f46712567d9e3626a36c249014542f4da147 100644 |
--- a/tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py |
+++ b/tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py |
@@ -24,7 +24,10 @@ |
from __future__ import absolute_import |
import functools |
+import os |
import signal |
+import threading |
+import time |
from boto.storage_uri import BucketStorageUri |
from gslib import cs_api_map |
@@ -34,8 +37,8 @@ from gslib.command import DummyArgChecker |
import gslib.tests.testcase as testcase |
from gslib.tests.testcase.base import RequiresIsolation |
from gslib.tests.util import unittest |
+from gslib.util import CheckMultiprocessingAvailableAndInit |
from gslib.util import IS_WINDOWS |
-from gslib.util import MultiprocessingIsAvailable |
# Amount of time for an individual test to run before timing out. We need a |
@@ -74,6 +77,18 @@ def _ReturnOneValue(cls, args, thread_state=None): |
return 1 |
+def _ReturnProcAndThreadId(cls, args, thread_state=None): |
+ return (os.getpid(), threading.currentThread().ident) |
+ |
+ |
+def _SleepThenReturnProcAndThreadId(cls, args, thread_state=None): |
+ # This can fail if the total time to spawn new processes and threads takes |
+ # longer than 5 seconds, but if that occurs, then we have a performance |
+ # problem that needs to be addressed. |
+ time.sleep(5) |
+ return _ReturnProcAndThreadId(cls, args, thread_state=thread_state) |
+ |
+ |
def _FailureFunc(cls, args, thread_state=None): |
raise CustomException('Failing on purpose.') |
@@ -214,7 +229,8 @@ class FakeCommand(Command): |
self.logger = CreateGsutilLogger('FakeCommand') |
self.parallel_operations = do_parallel |
self.failure_count = 0 |
- self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] |
+ self.multiprocessing_is_available = ( |
+ CheckMultiprocessingAvailableAndInit().is_available) |
self.debug = 0 |
@@ -272,6 +288,75 @@ class TestParallelismFramework(testcase.GsUtilUnitTestCase): |
self.assertEqual(len(args), len(results)) |
@RequiresIsolation |
+ def testNoTasksSingleProcessSingleThread(self): |
+ self._TestApplyWithNoTasks(1, 1) |
+ |
+ @RequiresIsolation |
+ def testNoTasksSingleProcessMultiThread(self): |
+ self._TestApplyWithNoTasks(1, 3) |
+ |
+ @RequiresIsolation |
+ @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
+ def testNoTasksMultiProcessSingleThread(self): |
+ self._TestApplyWithNoTasks(3, 1) |
+ |
+ @RequiresIsolation |
+ @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
+ def testNoTasksMultiProcessMultiThread(self): |
+ self._TestApplyWithNoTasks(3, 3) |
+ |
+ @Timeout |
+ def _TestApplyWithNoTasks(self, process_count, thread_count): |
+ """Tests that calling Apply with no tasks releases locks/semaphores.""" |
+ empty_args = [()] |
+ |
+ for _ in range(process_count * thread_count + 1): |
+ self._RunApply(_ReturnOneValue, empty_args, process_count, thread_count) |
+ |
+ # Ensure that work can still be performed. |
+ self._TestBasicApply(process_count, thread_count) |
+ |
+ @RequiresIsolation |
+ @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
+ def testApplySaturatesMultiProcessSingleThread(self): |
+ self._TestApplySaturatesAvailableProcessesAndThreads(3, 1) |
+ |
+ @RequiresIsolation |
+ def testApplySaturatesSingleProcessMultiThread(self): |
+ self._TestApplySaturatesAvailableProcessesAndThreads(1, 3) |
+ |
+ @RequiresIsolation |
+ @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
+ def testApplySaturatesMultiProcessMultiThread(self): |
+ self._TestApplySaturatesAvailableProcessesAndThreads(3, 3) |
+ |
+ @RequiresIsolation |
+ def _TestApplySaturatesAvailableProcessesAndThreads(self, process_count, |
+ thread_count): |
+ """Tests that created processes and threads evenly share tasks.""" |
+ calls_per_thread = 2 |
+ args = [()] * (process_count * thread_count * calls_per_thread) |
+ expected_calls_per_thread = calls_per_thread |
+ |
+ if not self.command_class(True).multiprocessing_is_available: |
+ # When multiprocessing is unavailable, only a single process is used. |
+ # Calls should be evenly distributed across threads. |
+ expected_calls_per_thread = calls_per_thread * process_count |
+ |
+ results = self._RunApply(_SleepThenReturnProcAndThreadId, args, |
+ process_count, thread_count) |
+ usage_dict = {} # (process_id, thread_id): number of tasks performed |
+ for (process_id, thread_id) in results: |
+ usage_dict[(process_id, thread_id)] = ( |
+ usage_dict.get((process_id, thread_id), 0) + 1) |
+ |
+ for (id_tuple, num_tasks_completed) in usage_dict.iteritems(): |
+ self.assertEqual(num_tasks_completed, expected_calls_per_thread, |
+ 'Process %s thread %s completed %s tasks. Expected: %s' % |
+ (id_tuple[0], id_tuple[1], num_tasks_completed, |
+ expected_calls_per_thread)) |
+ |
+ @RequiresIsolation |
def testIteratorFailureSingleProcessSingleThread(self): |
self._TestIteratorFailure(1, 1) |
@@ -353,12 +438,22 @@ class TestParallelismFramework(testcase.GsUtilUnitTestCase): |
expected_sum += len(arg) |
self.assertEqual(expected_sum, command_inst.arg_length_sum) |
- # Test that shared variables work when the iterator fails. |
- command_inst = self.command_class(True) |
- args = FailingIterator(10, [1, 3, 5]) |
- self._RunApply(_ReturnOneValue, args, process_count, thread_count, |
- command_inst=command_inst, shared_attrs=['failure_count']) |
- self.assertEqual(3, command_inst.failure_count) |
+ # Test that shared variables work when the iterator fails at the beginning, |
+ # middle, and end. |
+ for (failing_iterator, expected_failure_count) in ( |
+ (FailingIterator(5, [0]), 1), |
+ (FailingIterator(10, [1, 3, 5]), 3), |
+ (FailingIterator(5, [4]), 1)): |
+ command_inst = self.command_class(True) |
+ args = failing_iterator |
+ self._RunApply(_ReturnOneValue, args, process_count, thread_count, |
+ command_inst=command_inst, shared_attrs=['failure_count']) |
+ self.assertEqual( |
+ expected_failure_count, command_inst.failure_count, |
+ msg='Failure count did not match. Expected: %s, actual: %s ' |
+ 'for failing iterator of size %s, failing indices %s' % |
+ (expected_failure_count, command_inst.failure_count, |
+ failing_iterator.size, failing_iterator.failure_indices)) |
@RequiresIsolation |
def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self): |
@@ -564,11 +659,11 @@ class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework): |
"""Tests parallelism framework works with multiprocessing module unavailable. |
Notably, this test has no way to override previous calls |
- to gslib.util.MultiprocessingIsAvailable to prevent the initialization of |
- all of the global variables in command.py, so this still behaves slightly |
- differently than the behavior one would see on a machine where the |
- multiprocessing functionality is actually not available (in particular, it |
- will not catch the case where a global variable that is not available for |
- the sequential path is referenced before initialization). |
+ to gslib.util.CheckMultiprocessingAvailableAndInit to prevent the |
+ initialization of all of the global variables in command.py, so this still |
+ behaves slightly differently than the behavior one would see on a machine |
+ where the multiprocessing functionality is actually not available (in |
+ particular, it will not catch the case where a global variable that is not |
+ available for the sequential path is referenced before initialization). |
""" |
command_class = FakeCommandWithoutMultiprocessingModule |