| Index: tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py
|
| diff --git a/tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py b/tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py
|
| deleted file mode 100644
|
| index e895c1680a4c99fcfbaa116fc9f95cc3af06c53b..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/third_party/gsutilz/gslib/tests/test_parallelism_framework.py
|
| +++ /dev/null
|
| @@ -1,574 +0,0 @@
|
| -# -*- coding: utf-8 -*-
|
| -# Copyright 2013 Google Inc. All Rights Reserved.
|
| -#
|
| -# Permission is hereby granted, free of charge, to any person obtaining a
|
| -# copy of this software and associated documentation files (the
|
| -# "Software"), to deal in the Software without restriction, including
|
| -# without limitation the rights to use, copy, modify, merge, publish, dis-
|
| -# tribute, sublicense, and/or sell copies of the Software, and to permit
|
| -# persons to whom the Software is furnished to do so, subject to the fol-
|
| -# lowing conditions:
|
| -#
|
| -# The above copyright notice and this permission notice shall be included
|
| -# in all copies or substantial portions of the Software.
|
| -#
|
| -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
| -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
|
| -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
|
| -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
| -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
| -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
| -# IN THE SOFTWARE.
|
| -"""Unit tests for gsutil parallelism framework."""
|
| -
|
| -from __future__ import absolute_import
|
| -
|
| -import functools
|
| -import signal
|
| -
|
| -from boto.storage_uri import BucketStorageUri
|
| -from gslib import cs_api_map
|
| -from gslib.command import Command
|
| -from gslib.command import CreateGsutilLogger
|
| -from gslib.command import DummyArgChecker
|
| -import gslib.tests.testcase as testcase
|
| -from gslib.tests.testcase.base import RequiresIsolation
|
| -from gslib.tests.util import unittest
|
| -from gslib.util import IS_WINDOWS
|
| -from gslib.util import MultiprocessingIsAvailable
|
| -
|
| -
|
| -# Amount of time for an individual test to run before timing out. We need a
|
| -# reasonably high value since if many tests are running in parallel, an
|
| -# individual test may take a while to complete.
|
| -_TEST_TIMEOUT_SECONDS = 120
|
| -
|
| -
|
| -def Timeout(func):
|
| - """Decorator used to provide a timeout for functions."""
|
| - @functools.wraps(func)
|
| - def Wrapper(*args, **kwargs):
|
| - if not IS_WINDOWS:
|
| - signal.signal(signal.SIGALRM, _HandleAlarm)
|
| - signal.alarm(_TEST_TIMEOUT_SECONDS)
|
| - try:
|
| - func(*args, **kwargs)
|
| - finally:
|
| - if not IS_WINDOWS:
|
| - signal.alarm(0) # Cancel the alarm.
|
| - return Wrapper
|
| -
|
| -
|
| -# pylint: disable=unused-argument
|
| -def _HandleAlarm(signal_num, cur_stack_frame):
|
| - raise Exception('Test timed out.')
|
| -
|
| -
|
| -class CustomException(Exception):
|
| -
|
| - def __init__(self, exception_str):
|
| - super(CustomException, self).__init__(exception_str)
|
| -
|
| -
|
| -def _ReturnOneValue(cls, args, thread_state=None):
|
| - return 1
|
| -
|
| -
|
| -def _FailureFunc(cls, args, thread_state=None):
|
| - raise CustomException('Failing on purpose.')
|
| -
|
| -
|
| -def _FailingExceptionHandler(cls, e):
|
| - cls.failure_count += 1
|
| - raise CustomException('Exception handler failing on purpose.')
|
| -
|
| -
|
| -def _ExceptionHandler(cls, e):
|
| - cls.logger.exception(e)
|
| - cls.failure_count += 1
|
| -
|
| -
|
| -def _IncrementByLength(cls, args, thread_state=None):
|
| - cls.arg_length_sum += len(args)
|
| -
|
| -
|
| -def _AdjustProcessCountIfWindows(process_count):
|
| - if IS_WINDOWS:
|
| - return 1
|
| - else:
|
| - return process_count
|
| -
|
| -
|
| -def _ReApplyWithReplicatedArguments(cls, args, thread_state=None):
|
| - """Calls Apply with arguments repeated seven times.
|
| -
|
| - The first two elements of args should be the process and thread counts,
|
| - respectively, to be used for the recursive calls.
|
| -
|
| - Args:
|
| - cls: The Command class to call Apply on.
|
| - args: Arguments to pass to Apply.
|
| - thread_state: Unused, required by function signature.
|
| -
|
| - Returns:
|
| - Number of values returned by the two calls to Apply.
|
| - """
|
| - new_args = [args] * 7
|
| - process_count = _AdjustProcessCountIfWindows(args[0])
|
| - thread_count = args[1]
|
| - return_values = cls.Apply(_PerformNRecursiveCalls, new_args,
|
| - _ExceptionHandler, arg_checker=DummyArgChecker,
|
| - process_count=process_count,
|
| - thread_count=thread_count,
|
| - should_return_results=True)
|
| - ret = sum(return_values)
|
| -
|
| - return_values = cls.Apply(_ReturnOneValue, new_args,
|
| - _ExceptionHandler, arg_checker=DummyArgChecker,
|
| - process_count=process_count,
|
| - thread_count=thread_count,
|
| - should_return_results=True)
|
| -
|
| - return len(return_values) + ret
|
| -
|
| -
|
| -def _PerformNRecursiveCalls(cls, args, thread_state=None):
|
| - """Calls Apply to perform N recursive calls.
|
| -
|
| - The first two elements of args should be the process and thread counts,
|
| - respectively, to be used for the recursive calls, while N is the third element
|
| - (the number of recursive calls to make).
|
| -
|
| - Args:
|
| - cls: The Command class to call Apply on.
|
| - args: Arguments to pass to Apply.
|
| - thread_state: Unused, required by function signature.
|
| -
|
| - Returns:
|
| - Number of values returned by the call to Apply.
|
| - """
|
| - process_count = _AdjustProcessCountIfWindows(args[0])
|
| - thread_count = args[1]
|
| - return_values = cls.Apply(_ReturnOneValue, [()] * args[2], _ExceptionHandler,
|
| - arg_checker=DummyArgChecker,
|
| - process_count=process_count,
|
| - thread_count=thread_count,
|
| - should_return_results=True)
|
| - return len(return_values)
|
| -
|
| -
|
| -def _SkipEvenNumbersArgChecker(cls, arg):
|
| - return arg % 2 != 0
|
| -
|
| -
|
| -class FailingIterator(object):
|
| -
|
| - def __init__(self, size, failure_indices):
|
| - self.size = size
|
| - self.failure_indices = failure_indices
|
| - self.current_index = 0
|
| -
|
| - def __iter__(self):
|
| - return self
|
| -
|
| - def next(self):
|
| - if self.current_index == self.size:
|
| - raise StopIteration('')
|
| - elif self.current_index in self.failure_indices:
|
| - self.current_index += 1
|
| - raise CustomException(
|
| - 'Iterator failing on purpose at index %d.' % self.current_index)
|
| - else:
|
| - self.current_index += 1
|
| - return self.current_index - 1
|
| -
|
| -
|
| -class FakeCommand(Command):
|
| - """Fake command class for overriding command instance state."""
|
| - command_spec = Command.CreateCommandSpec(
|
| - 'fake',
|
| - command_name_aliases=[],
|
| - )
|
| - # Help specification. See help_provider.py for documentation.
|
| - help_spec = Command.HelpSpec(
|
| - help_name='fake',
|
| - help_name_aliases=[],
|
| - help_type='command_help',
|
| - help_one_line_summary='Something to take up space.',
|
| - help_text='Something else to take up space.',
|
| - subcommand_help_text={},
|
| - )
|
| -
|
| - def __init__(self, do_parallel):
|
| - self.bucket_storage_uri_class = BucketStorageUri
|
| - support_map = {
|
| - 'gs': ['JSON'],
|
| - 's3': ['XML']
|
| - }
|
| - default_map = {
|
| - 'gs': 'JSON',
|
| - 's3': 'XML'
|
| - }
|
| - self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap(
|
| - cs_api_map.GsutilApiClassMapFactory, support_map, default_map)
|
| - self.logger = CreateGsutilLogger('FakeCommand')
|
| - self.parallel_operations = do_parallel
|
| - self.failure_count = 0
|
| - self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
|
| - self.debug = 0
|
| -
|
| -
|
| -class FakeCommandWithoutMultiprocessingModule(FakeCommand):
|
| -
|
| - def __init__(self, do_parallel):
|
| - super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel)
|
| - self.multiprocessing_is_available = False
|
| -
|
| -
|
| -# TODO: Figure out a good way to test that ctrl+C really stops execution,
|
| -# and also that ctrl+C works when there are still tasks enqueued.
|
| -class TestParallelismFramework(testcase.GsUtilUnitTestCase):
|
| - """gsutil parallelism framework test suite."""
|
| -
|
| - command_class = FakeCommand
|
| -
|
| - def _RunApply(self, func, args_iterator, process_count, thread_count,
|
| - command_inst=None, shared_attrs=None, fail_on_error=False,
|
| - thr_exc_handler=None, arg_checker=DummyArgChecker):
|
| - command_inst = command_inst or self.command_class(True)
|
| - exception_handler = thr_exc_handler or _ExceptionHandler
|
| -
|
| - return command_inst.Apply(func, args_iterator, exception_handler,
|
| - thread_count=thread_count,
|
| - process_count=process_count,
|
| - arg_checker=arg_checker,
|
| - should_return_results=True,
|
| - shared_attrs=shared_attrs,
|
| - fail_on_error=fail_on_error)
|
| -
|
| - @RequiresIsolation
|
| - def testBasicApplySingleProcessSingleThread(self):
|
| - self._TestBasicApply(1, 1)
|
| -
|
| - @RequiresIsolation
|
| - def testBasicApplySingleProcessMultiThread(self):
|
| - self._TestBasicApply(1, 3)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testBasicApplyMultiProcessSingleThread(self):
|
| - self._TestBasicApply(3, 1)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testBasicApplyMultiProcessMultiThread(self):
|
| - self._TestBasicApply(3, 3)
|
| -
|
| - @Timeout
|
| - def _TestBasicApply(self, process_count, thread_count):
|
| - args = [()] * (17 * process_count * thread_count + 1)
|
| -
|
| - results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
|
| - self.assertEqual(len(args), len(results))
|
| -
|
| - @RequiresIsolation
|
| - def testIteratorFailureSingleProcessSingleThread(self):
|
| - self._TestIteratorFailure(1, 1)
|
| -
|
| - @RequiresIsolation
|
| - def testIteratorFailureSingleProcessMultiThread(self):
|
| - self._TestIteratorFailure(1, 3)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testIteratorFailureMultiProcessSingleThread(self):
|
| - self._TestIteratorFailure(3, 1)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testIteratorFailureMultiProcessMultiThread(self):
|
| - self._TestIteratorFailure(3, 3)
|
| -
|
| - @Timeout
|
| - def _TestIteratorFailure(self, process_count, thread_count):
|
| - """Tests apply with a failing iterator."""
|
| - # Tests for fail_on_error == False.
|
| -
|
| - args = FailingIterator(10, [0])
|
| - results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
|
| - self.assertEqual(9, len(results))
|
| -
|
| - args = FailingIterator(10, [5])
|
| - results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
|
| - self.assertEqual(9, len(results))
|
| -
|
| - args = FailingIterator(10, [9])
|
| - results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
|
| - self.assertEqual(9, len(results))
|
| -
|
| - if process_count * thread_count > 1:
|
| - # In this case, we should ignore the fail_on_error flag.
|
| - args = FailingIterator(10, [9])
|
| - results = self._RunApply(_ReturnOneValue, args, process_count,
|
| - thread_count, fail_on_error=True)
|
| - self.assertEqual(9, len(results))
|
| -
|
| - args = FailingIterator(10, range(10))
|
| - results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
|
| - self.assertEqual(0, len(results))
|
| -
|
| - args = FailingIterator(0, [])
|
| - results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
|
| - self.assertEqual(0, len(results))
|
| -
|
| - @RequiresIsolation
|
| - def testTestSharedAttrsWorkSingleProcessSingleThread(self):
|
| - self._TestSharedAttrsWork(1, 1)
|
| -
|
| - @RequiresIsolation
|
| - def testTestSharedAttrsWorkSingleProcessMultiThread(self):
|
| - self._TestSharedAttrsWork(1, 3)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testTestSharedAttrsWorkMultiProcessSingleThread(self):
|
| - self._TestSharedAttrsWork(3, 1)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testTestSharedAttrsWorkMultiProcessMultiThread(self):
|
| - self._TestSharedAttrsWork(3, 3)
|
| -
|
| - @Timeout
|
| - def _TestSharedAttrsWork(self, process_count, thread_count):
|
| - """Tests that Apply successfully uses shared_attrs."""
|
| - command_inst = self.command_class(True)
|
| - command_inst.arg_length_sum = 19
|
| - args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd']
|
| - self._RunApply(_IncrementByLength, args, process_count,
|
| - thread_count, command_inst=command_inst,
|
| - shared_attrs=['arg_length_sum'])
|
| - expected_sum = 19
|
| - for arg in args:
|
| - expected_sum += len(arg)
|
| - self.assertEqual(expected_sum, command_inst.arg_length_sum)
|
| -
|
| - # Test that shared variables work when the iterator fails.
|
| - command_inst = self.command_class(True)
|
| - args = FailingIterator(10, [1, 3, 5])
|
| - self._RunApply(_ReturnOneValue, args, process_count, thread_count,
|
| - command_inst=command_inst, shared_attrs=['failure_count'])
|
| - self.assertEqual(3, command_inst.failure_count)
|
| -
|
| - @RequiresIsolation
|
| - def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
|
| - self._TestThreadsSurviveExceptionsInFunc(1, 1)
|
| -
|
| - @RequiresIsolation
|
| - def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self):
|
| - self._TestThreadsSurviveExceptionsInFunc(1, 3)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self):
|
| - self._TestThreadsSurviveExceptionsInFunc(3, 1)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testThreadsSurviveExceptionsInFuncMultiProcessMultiThread(self):
|
| - self._TestThreadsSurviveExceptionsInFunc(3, 3)
|
| -
|
| - @Timeout
|
| - def _TestThreadsSurviveExceptionsInFunc(self, process_count, thread_count):
|
| - command_inst = self.command_class(True)
|
| - args = ([()] * 5)
|
| - self._RunApply(_FailureFunc, args, process_count, thread_count,
|
| - command_inst=command_inst, shared_attrs=['failure_count'],
|
| - thr_exc_handler=_FailingExceptionHandler)
|
| - self.assertEqual(len(args), command_inst.failure_count)
|
| -
|
| - @RequiresIsolation
|
| - def testThreadsSurviveExceptionsInHandlerSingleProcessSingleThread(self):
|
| - self._TestThreadsSurviveExceptionsInHandler(1, 1)
|
| -
|
| - @RequiresIsolation
|
| - def testThreadsSurviveExceptionsInHandlerSingleProcessMultiThread(self):
|
| - self._TestThreadsSurviveExceptionsInHandler(1, 3)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testThreadsSurviveExceptionsInHandlerMultiProcessSingleThread(self):
|
| - self._TestThreadsSurviveExceptionsInHandler(3, 1)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testThreadsSurviveExceptionsInHandlerMultiProcessMultiThread(self):
|
| - self._TestThreadsSurviveExceptionsInHandler(3, 3)
|
| -
|
| - @Timeout
|
| - def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count):
|
| - command_inst = self.command_class(True)
|
| - args = ([()] * 5)
|
| - self._RunApply(_FailureFunc, args, process_count, thread_count,
|
| - command_inst=command_inst, shared_attrs=['failure_count'],
|
| - thr_exc_handler=_FailingExceptionHandler)
|
| - self.assertEqual(len(args), command_inst.failure_count)
|
| -
|
| - @RequiresIsolation
|
| - @Timeout
|
| - def testFailOnErrorFlag(self):
|
| - """Tests that fail_on_error produces the correct exception on failure."""
|
| - def _ExpectCustomException(test_func):
|
| - try:
|
| - test_func()
|
| - self.fail(
|
| - 'Setting fail_on_error should raise any exception encountered.')
|
| - except CustomException, e:
|
| - pass
|
| - except Exception, e:
|
| - self.fail('Got unexpected error: ' + str(e))
|
| -
|
| - def _RunFailureFunc():
|
| - command_inst = self.command_class(True)
|
| - args = ([()] * 5)
|
| - self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst,
|
| - shared_attrs=['failure_count'], fail_on_error=True)
|
| - _ExpectCustomException(_RunFailureFunc)
|
| -
|
| - def _RunFailingIteratorFirstPosition():
|
| - args = FailingIterator(10, [0])
|
| - results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
|
| - self.assertEqual(0, len(results))
|
| - _ExpectCustomException(_RunFailingIteratorFirstPosition)
|
| -
|
| - def _RunFailingIteratorPositionMiddlePosition():
|
| - args = FailingIterator(10, [5])
|
| - results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
|
| - self.assertEqual(5, len(results))
|
| - _ExpectCustomException(_RunFailingIteratorPositionMiddlePosition)
|
| -
|
| - def _RunFailingIteratorLastPosition():
|
| - args = FailingIterator(10, [9])
|
| - results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
|
| - self.assertEqual(9, len(results))
|
| - _ExpectCustomException(_RunFailingIteratorLastPosition)
|
| -
|
| - def _RunFailingIteratorMultiplePositions():
|
| - args = FailingIterator(10, [1, 3, 5])
|
| - results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
|
| - self.assertEqual(1, len(results))
|
| - _ExpectCustomException(_RunFailingIteratorMultiplePositions)
|
| -
|
| - @RequiresIsolation
|
| - def testRecursiveDepthThreeDifferentFunctionsSingleProcessSingleThread(self):
|
| - self._TestRecursiveDepthThreeDifferentFunctions(1, 1)
|
| -
|
| - @RequiresIsolation
|
| - def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self):
|
| - self._TestRecursiveDepthThreeDifferentFunctions(1, 3)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self):
|
| - self._TestRecursiveDepthThreeDifferentFunctions(3, 1)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self):
|
| - self._TestRecursiveDepthThreeDifferentFunctions(3, 3)
|
| -
|
| - @Timeout
|
| - def _TestRecursiveDepthThreeDifferentFunctions(self, process_count,
|
| - thread_count):
|
| - """Tests recursive application of Apply.
|
| -
|
| - Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
|
| - Apply(C).
|
| -
|
| - Args:
|
| - process_count: Number of processes to use.
|
| - thread_count: Number of threads to use.
|
| - """
|
| - base_args = [3, 1, 4, 1, 5]
|
| - args = [[process_count, thread_count, count] for count in base_args]
|
| -
|
| - results = self._RunApply(_ReApplyWithReplicatedArguments, args,
|
| - process_count, thread_count)
|
| - self.assertEqual(7 * (sum(base_args) + len(base_args)), sum(results))
|
| -
|
| - @RequiresIsolation
|
| - def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self):
|
| - self._TestExceptionInProducerRaisesAndTerminates(1, 1)
|
| -
|
| - @RequiresIsolation
|
| - def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self):
|
| - self._TestExceptionInProducerRaisesAndTerminates(1, 3)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self):
|
| - self._TestExceptionInProducerRaisesAndTerminates(3, 1)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self):
|
| - self._TestExceptionInProducerRaisesAndTerminates(3, 3)
|
| -
|
| - @Timeout
|
| - def _TestExceptionInProducerRaisesAndTerminates(self, process_count,
|
| - thread_count):
|
| - args = self # The ProducerThread will try and fail to iterate over this.
|
| - try:
|
| - self._RunApply(_ReturnOneValue, args, process_count, thread_count)
|
| - self.fail('Did not raise expected exception.')
|
| - except TypeError:
|
| - pass
|
| -
|
| - @RequiresIsolation
|
| - def testSkippedArgumentsSingleThreadSingleProcess(self):
|
| - self._TestSkippedArguments(1, 1)
|
| -
|
| - @RequiresIsolation
|
| - def testSkippedArgumentsMultiThreadSingleProcess(self):
|
| - self._TestSkippedArguments(1, 3)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testSkippedArgumentsSingleThreadMultiProcess(self):
|
| - self._TestSkippedArguments(3, 1)
|
| -
|
| - @RequiresIsolation
|
| - @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| - def testSkippedArgumentsMultiThreadMultiProcess(self):
|
| - self._TestSkippedArguments(3, 3)
|
| -
|
| - @Timeout
|
| - def _TestSkippedArguments(self, process_count, thread_count):
|
| -
|
| - # Skip a proper subset of the arguments.
|
| - n = 2 * process_count * thread_count
|
| - args = range(1, n + 1)
|
| - results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
|
| - arg_checker=_SkipEvenNumbersArgChecker)
|
| - self.assertEqual(n / 2, len(results)) # We know n is even.
|
| - self.assertEqual(n / 2, sum(results))
|
| -
|
| - # Skip all arguments.
|
| - args = [2 * x for x in args]
|
| - results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
|
| - arg_checker=_SkipEvenNumbersArgChecker)
|
| - self.assertEqual(0, len(results))
|
| -
|
| -
|
| -class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
|
| - """Tests parallelism framework works with multiprocessing module unavailable.
|
| -
|
| - Notably, this test has no way to override previous calls
|
| - to gslib.util.MultiprocessingIsAvailable to prevent the initialization of
|
| - all of the global variables in command.py, so this still behaves slightly
|
| - differently than the behavior one would see on a machine where the
|
| - multiprocessing functionality is actually not available (in particular, it
|
| - will not catch the case where a global variable that is not available for
|
| - the sequential path is referenced before initialization).
|
| - """
|
| - command_class = FakeCommandWithoutMultiprocessingModule
|
|
|