Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(215)

Unified Diff: tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py

Issue 1376593003: Roll gsutil version to 4.15. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py
diff --git a/tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py b/tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py
index e895c1680a4c99fcfbaa116fc9f95cc3af06c53b..b3b9f46712567d9e3626a36c249014542f4da147 100644
--- a/tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py
+++ b/tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py
@@ -24,7 +24,10 @@
from __future__ import absolute_import
import functools
+import os
import signal
+import threading
+import time
from boto.storage_uri import BucketStorageUri
from gslib import cs_api_map
@@ -34,8 +37,8 @@ from gslib.command import DummyArgChecker
import gslib.tests.testcase as testcase
from gslib.tests.testcase.base import RequiresIsolation
from gslib.tests.util import unittest
+from gslib.util import CheckMultiprocessingAvailableAndInit
from gslib.util import IS_WINDOWS
-from gslib.util import MultiprocessingIsAvailable
# Amount of time for an individual test to run before timing out. We need a
@@ -74,6 +77,18 @@ def _ReturnOneValue(cls, args, thread_state=None):
return 1
+def _ReturnProcAndThreadId(cls, args, thread_state=None):
+ return (os.getpid(), threading.currentThread().ident)
+
+
+def _SleepThenReturnProcAndThreadId(cls, args, thread_state=None):
+ # This can fail if the total time to spawn new processes and threads takes
+ # longer than 5 seconds, but if that occurs, then we have a performance
+ # problem that needs to be addressed.
+ time.sleep(5)
+ return _ReturnProcAndThreadId(cls, args, thread_state=thread_state)
+
+
def _FailureFunc(cls, args, thread_state=None):
raise CustomException('Failing on purpose.')
@@ -214,7 +229,8 @@ class FakeCommand(Command):
self.logger = CreateGsutilLogger('FakeCommand')
self.parallel_operations = do_parallel
self.failure_count = 0
- self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
+ self.multiprocessing_is_available = (
+ CheckMultiprocessingAvailableAndInit().is_available)
self.debug = 0
@@ -272,6 +288,75 @@ class TestParallelismFramework(testcase.GsUtilUnitTestCase):
self.assertEqual(len(args), len(results))
@RequiresIsolation
+ def testNoTasksSingleProcessSingleThread(self):
+ self._TestApplyWithNoTasks(1, 1)
+
+ @RequiresIsolation
+ def testNoTasksSingleProcessMultiThread(self):
+ self._TestApplyWithNoTasks(1, 3)
+
+ @RequiresIsolation
+ @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
+ def testNoTasksMultiProcessSingleThread(self):
+ self._TestApplyWithNoTasks(3, 1)
+
+ @RequiresIsolation
+ @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
+ def testNoTasksMultiProcessMultiThread(self):
+ self._TestApplyWithNoTasks(3, 3)
+
+ @Timeout
+ def _TestApplyWithNoTasks(self, process_count, thread_count):
+ """Tests that calling Apply with no tasks releases locks/semaphores."""
+ empty_args = [()]
+
+ for _ in range(process_count * thread_count + 1):
+ self._RunApply(_ReturnOneValue, empty_args, process_count, thread_count)
+
+ # Ensure that work can still be performed.
+ self._TestBasicApply(process_count, thread_count)
+
+ @RequiresIsolation
+ @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
+ def testApplySaturatesMultiProcessSingleThread(self):
+ self._TestApplySaturatesAvailableProcessesAndThreads(3, 1)
+
+ @RequiresIsolation
+ def testApplySaturatesSingleProcessMultiThread(self):
+ self._TestApplySaturatesAvailableProcessesAndThreads(1, 3)
+
+ @RequiresIsolation
+ @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
+ def testApplySaturatesMultiProcessMultiThread(self):
+ self._TestApplySaturatesAvailableProcessesAndThreads(3, 3)
+
+ @RequiresIsolation
+ def _TestApplySaturatesAvailableProcessesAndThreads(self, process_count,
+ thread_count):
+ """Tests that created processes and threads evenly share tasks."""
+ calls_per_thread = 2
+ args = [()] * (process_count * thread_count * calls_per_thread)
+ expected_calls_per_thread = calls_per_thread
+
+ if not self.command_class(True).multiprocessing_is_available:
+ # When multiprocessing is unavailable, only a single process is used.
+ # Calls should be evenly distributed across threads.
+ expected_calls_per_thread = calls_per_thread * process_count
+
+ results = self._RunApply(_SleepThenReturnProcAndThreadId, args,
+ process_count, thread_count)
+ usage_dict = {} # (process_id, thread_id): number of tasks performed
+ for (process_id, thread_id) in results:
+ usage_dict[(process_id, thread_id)] = (
+ usage_dict.get((process_id, thread_id), 0) + 1)
+
+ for (id_tuple, num_tasks_completed) in usage_dict.iteritems():
+ self.assertEqual(num_tasks_completed, expected_calls_per_thread,
+ 'Process %s thread %s completed %s tasks. Expected: %s' %
+ (id_tuple[0], id_tuple[1], num_tasks_completed,
+ expected_calls_per_thread))
+
+ @RequiresIsolation
def testIteratorFailureSingleProcessSingleThread(self):
self._TestIteratorFailure(1, 1)
@@ -353,12 +438,22 @@ class TestParallelismFramework(testcase.GsUtilUnitTestCase):
expected_sum += len(arg)
self.assertEqual(expected_sum, command_inst.arg_length_sum)
- # Test that shared variables work when the iterator fails.
- command_inst = self.command_class(True)
- args = FailingIterator(10, [1, 3, 5])
- self._RunApply(_ReturnOneValue, args, process_count, thread_count,
- command_inst=command_inst, shared_attrs=['failure_count'])
- self.assertEqual(3, command_inst.failure_count)
+ # Test that shared variables work when the iterator fails at the beginning,
+ # middle, and end.
+ for (failing_iterator, expected_failure_count) in (
+ (FailingIterator(5, [0]), 1),
+ (FailingIterator(10, [1, 3, 5]), 3),
+ (FailingIterator(5, [4]), 1)):
+ command_inst = self.command_class(True)
+ args = failing_iterator
+ self._RunApply(_ReturnOneValue, args, process_count, thread_count,
+ command_inst=command_inst, shared_attrs=['failure_count'])
+ self.assertEqual(
+ expected_failure_count, command_inst.failure_count,
+ msg='Failure count did not match. Expected: %s, actual: %s '
+ 'for failing iterator of size %s, failing indices %s' %
+ (expected_failure_count, command_inst.failure_count,
+ failing_iterator.size, failing_iterator.failure_indices))
@RequiresIsolation
def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
@@ -564,11 +659,11 @@ class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
"""Tests parallelism framework works with multiprocessing module unavailable.
Notably, this test has no way to override previous calls
- to gslib.util.MultiprocessingIsAvailable to prevent the initialization of
- all of the global variables in command.py, so this still behaves slightly
- differently than the behavior one would see on a machine where the
- multiprocessing functionality is actually not available (in particular, it
- will not catch the case where a global variable that is not available for
- the sequential path is referenced before initialization).
+ to gslib.util.CheckMultiprocessingAvailableAndInit to prevent the
+ initialization of all of the global variables in command.py, so this still
+ behaves slightly differently than the behavior one would see on a machine
+ where the multiprocessing functionality is actually not available (in
+ particular, it will not catch the case where a global variable that is not
+ available for the sequential path is referenced before initialization).
"""
command_class = FakeCommandWithoutMultiprocessingModule

Powered by Google App Engine
This is Rietveld 408576698