| Index: gslib/tests/test_parallelism_framework.py
|
| ===================================================================
|
| --- gslib/tests/test_parallelism_framework.py (revision 33376)
|
| +++ gslib/tests/test_parallelism_framework.py (working copy)
|
| @@ -1,3 +1,4 @@
|
| +# -*- coding: utf-8 -*-
|
| # Copyright 2013 Google Inc. All Rights Reserved.
|
| #
|
| # Permission is hereby granted, free of charge, to any person obtaining a
|
| @@ -18,26 +19,19 @@
|
| # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
| # IN THE SOFTWARE.
|
| -
|
| """Unit tests for gsutil parallelism framework."""
|
|
|
| +from __future__ import absolute_import
|
| +
|
| import functools
|
| -import gslib.help_provider as help_provider
|
| -import gslib.tests.testcase as testcase
|
| import signal
|
|
|
| +from boto.storage_uri import BucketStorageUri
|
| +from gslib import cs_api_map
|
| from gslib.command import Command
|
| -from gslib.command import COMMAND_NAME
|
| -from gslib.command import COMMAND_NAME_ALIASES
|
| +from gslib.command import CreateGsutilLogger
|
| from gslib.command import DummyArgChecker
|
| -from gslib.command import CreateGsutilLogger
|
| -from gslib.commands.lifecycle import LifecycleCommand
|
| -from gslib.help_provider import HELP_NAME
|
| -from gslib.help_provider import HELP_NAME_ALIASES
|
| -from gslib.help_provider import HELP_ONE_LINE_SUMMARY
|
| -from gslib.help_provider import HELP_TEXT
|
| -from gslib.help_provider import HelpType
|
| -from gslib.help_provider import HELP_TYPE
|
| +import gslib.tests.testcase as testcase
|
| from gslib.tests.util import unittest
|
| from gslib.util import IS_WINDOWS
|
| from gslib.util import MultiprocessingIsAvailable
|
| @@ -46,7 +40,7 @@
|
| def Timeout(func):
|
| """Decorator used to provide a timeout for functions."""
|
| @functools.wraps(func)
|
| - def wrapper(*args, **kwargs):
|
| + def Wrapper(*args, **kwargs):
|
| if not IS_WINDOWS:
|
| signal.signal(signal.SIGALRM, _HandleAlarm)
|
| signal.alarm(5)
|
| @@ -55,33 +49,42 @@
|
| finally:
|
| if not IS_WINDOWS:
|
| signal.alarm(0) # Cancel the alarm.
|
| - return wrapper
|
| + return Wrapper
|
|
|
| +
|
| +# pylint: disable=unused-argument
|
| def _HandleAlarm(signal_num, cur_stack_frame):
|
| - raise Exception("Test timed out.")
|
| + raise Exception('Test timed out.')
|
|
|
| +
|
| class CustomException(Exception):
|
| +
|
| def __init__(self, exception_str):
|
| super(CustomException, self).__init__(exception_str)
|
|
|
|
|
| -def _ReturnOneValue(cls, args):
|
| +def _ReturnOneValue(cls, args, thread_state=None):
|
| return 1
|
|
|
| -def _FailureFunc(cls, args):
|
| - raise CustomException("Failing on purpose.")
|
|
|
| +def _FailureFunc(cls, args, thread_state=None):
|
| + raise CustomException('Failing on purpose.')
|
| +
|
| +
|
| def _FailingExceptionHandler(cls, e):
|
| cls.failure_count += 1
|
| - raise CustomException("Exception handler failing on purpose.")
|
| + raise CustomException('Exception handler failing on purpose.')
|
|
|
| +
|
| def _ExceptionHandler(cls, e):
|
| cls.logger.exception(e)
|
| cls.failure_count += 1
|
|
|
| -def _IncrementByLength(cls, args):
|
| +
|
| +def _IncrementByLength(cls, args, thread_state=None):
|
| cls.arg_length_sum += len(args)
|
|
|
| +
|
| def _AdjustProcessCountIfWindows(process_count):
|
| if IS_WINDOWS:
|
| return 1
|
| @@ -89,7 +92,8 @@
|
| return process_count
|
|
|
|
|
| -def _ReApplyWithReplicatedArguments(cls, args):
|
| +def _ReApplyWithReplicatedArguments(cls, args, thread_state=None):
|
| + """Calls Apply with arguments repeated seven times."""
|
| new_args = [args] * 7
|
| process_count = _AdjustProcessCountIfWindows(2)
|
| return_values = cls.Apply(_PerformNRecursiveCalls, new_args,
|
| @@ -97,16 +101,16 @@
|
| process_count=process_count, thread_count=2,
|
| should_return_results=True)
|
| ret = sum(return_values)
|
| -
|
| +
|
| return_values = cls.Apply(_ReturnOneValue, new_args,
|
| _ExceptionHandler, arg_checker=DummyArgChecker,
|
| process_count=process_count, thread_count=2,
|
| should_return_results=True)
|
| -
|
| +
|
| return len(return_values) + ret
|
|
|
|
|
| -def _PerformNRecursiveCalls(cls, args):
|
| +def _PerformNRecursiveCalls(cls, args, thread_state=None):
|
| process_count = _AdjustProcessCountIfWindows(2)
|
| return_values = cls.Apply(_ReturnOneValue, [()] * args, _ExceptionHandler,
|
| arg_checker=DummyArgChecker,
|
| @@ -120,6 +124,7 @@
|
|
|
|
|
| class FailingIterator(object):
|
| +
|
| def __init__(self, size, failure_indices):
|
| self.size = size
|
| self.failure_indices = failure_indices
|
| @@ -141,26 +146,42 @@
|
|
|
|
|
| class FakeCommand(Command):
|
| - command_spec = {
|
| - COMMAND_NAME : 'fake',
|
| - COMMAND_NAME_ALIASES : [],
|
| - }
|
| - help_spec = {
|
| - HELP_NAME : 'fake',
|
| - HELP_NAME_ALIASES : [],
|
| - HELP_TYPE : HelpType.COMMAND_HELP,
|
| - HELP_ONE_LINE_SUMMARY : 'Something to take up space.',
|
| - HELP_TEXT : 'Something else to take up space.',
|
| - }
|
| + """Fake command class for overriding command instance state."""
|
| + command_spec = Command.CreateCommandSpec(
|
| + 'fake',
|
| + command_name_aliases=[],
|
| + )
|
| + # Help specification. See help_provider.py for documentation.
|
| + help_spec = Command.HelpSpec(
|
| + help_name='fake',
|
| + help_name_aliases=[],
|
| + help_type='command_help',
|
| + help_one_line_summary='Something to take up space.',
|
| + help_text='Something else to take up space.',
|
| + subcommand_help_text={},
|
| + )
|
|
|
| def __init__(self, do_parallel):
|
| + self.bucket_storage_uri_class = BucketStorageUri
|
| + support_map = {
|
| + 'gs': ['JSON'],
|
| + 's3': ['XML']
|
| + }
|
| + default_map = {
|
| + 'gs': 'JSON',
|
| + 's3': 'XML'
|
| + }
|
| + self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap(
|
| + cs_api_map.GsutilApiClassMapFactory, support_map, default_map)
|
| self.logger = CreateGsutilLogger('FakeCommand')
|
| self.parallel_operations = do_parallel
|
| self.failure_count = 0
|
| self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
|
| + self.debug = 0
|
|
|
|
|
| class FakeCommandWithoutMultiprocessingModule(FakeCommand):
|
| +
|
| def __init__(self, do_parallel):
|
| super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel)
|
| self.multiprocessing_is_available = False
|
| @@ -170,7 +191,7 @@
|
| # and also that ctrl+C works when there are still tasks enqueued.
|
| class TestParallelismFramework(testcase.GsUtilUnitTestCase):
|
| """gsutil parallelism framework test suite."""
|
| -
|
| +
|
| command_class = FakeCommand
|
|
|
| def _RunApply(self, func, args_iterator, process_count, thread_count,
|
| @@ -178,6 +199,7 @@
|
| thr_exc_handler=None, arg_checker=DummyArgChecker):
|
| command_inst = command_inst or self.command_class(True)
|
| exception_handler = thr_exc_handler or _ExceptionHandler
|
| +
|
| return command_inst.Apply(func, args_iterator, exception_handler,
|
| thread_count=thread_count,
|
| process_count=process_count,
|
| @@ -223,6 +245,7 @@
|
|
|
| @Timeout
|
| def _TestIteratorFailure(self, process_count, thread_count):
|
| + """Tests apply with a failing iterator."""
|
| # Tests for fail_on_error == False.
|
|
|
| args = FailingIterator(10, [0])
|
| @@ -236,7 +259,7 @@
|
| args = FailingIterator(10, [9])
|
| results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
|
| self.assertEqual(9, len(results))
|
| -
|
| +
|
| if process_count * thread_count > 1:
|
| # In this case, we should ignore the fail_on_error flag.
|
| args = FailingIterator(10, [9])
|
| @@ -247,7 +270,7 @@
|
| args = FailingIterator(10, range(10))
|
| results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
|
| self.assertEqual(0, len(results))
|
| -
|
| +
|
| args = FailingIterator(0, [])
|
| results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
|
| self.assertEqual(0, len(results))
|
| @@ -268,6 +291,7 @@
|
|
|
| @Timeout
|
| def _TestSharedAttrsWork(self, process_count, thread_count):
|
| + """Tests that Apply successfully uses shared_attrs."""
|
| command_inst = self.command_class(True)
|
| command_inst.arg_length_sum = 19
|
| args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd']
|
| @@ -285,7 +309,7 @@
|
| self._RunApply(_ReturnOneValue, args, process_count, thread_count,
|
| command_inst=command_inst, shared_attrs=['failure_count'])
|
| self.assertEqual(3, command_inst.failure_count)
|
| -
|
| +
|
| def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
|
| self._TestThreadsSurviveExceptionsInFunc(1, 1)
|
|
|
| @@ -334,6 +358,7 @@
|
|
|
| @Timeout
|
| def testFailOnErrorFlag(self):
|
| + """Tests that fail_on_error produces the correct exception on failure."""
|
| def _ExpectCustomException(test_func):
|
| try:
|
| test_func()
|
| @@ -342,15 +367,13 @@
|
| except CustomException, e:
|
| pass
|
| except Exception, e:
|
| - self.fail("Got unexpected error: " + str(e))
|
| -
|
| + self.fail('Got unexpected error: ' + str(e))
|
| +
|
| def _RunFailureFunc():
|
| command_inst = self.command_class(True)
|
| args = ([()] * 5)
|
| - results = self._RunApply(_FailureFunc, args, 1, 1,
|
| - command_inst=command_inst,
|
| - shared_attrs=['failure_count'],
|
| - fail_on_error=True)
|
| + self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst,
|
| + shared_attrs=['failure_count'], fail_on_error=True)
|
| _ExpectCustomException(_RunFailureFunc)
|
|
|
| def _RunFailingIteratorFirstPosition():
|
| @@ -390,12 +413,18 @@
|
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self):
|
| self._TestRecursiveDepthThreeDifferentFunctions(10, 10)
|
| -
|
| +
|
| @Timeout
|
| def _TestRecursiveDepthThreeDifferentFunctions(self, process_count,
|
| thread_count):
|
| - """Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
|
| - Apply(C).
|
| + """Tests recursive application of Apply.
|
| +
|
| + Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
|
| + Apply(C).
|
| +
|
| + Args:
|
| + process_count: Number of processes to use.
|
| + thread_count: Number of threads to use.
|
| """
|
| args = ([3, 1, 4, 1, 5])
|
| results = self._RunApply(_ReApplyWithReplicatedArguments, args,
|
| @@ -422,19 +451,21 @@
|
| args = self # The ProducerThread will try and fail to iterate over this.
|
| try:
|
| self._RunApply(_ReturnOneValue, args, process_count, thread_count)
|
| - self.fail("Did not throw expected exception.")
|
| - except TypeError, e:
|
| + self.fail('Did not raise expected exception.')
|
| + except TypeError:
|
| pass
|
| -
|
| +
|
| def testSkippedArgumentsSingleThreadSingleProcess(self):
|
| self._TestSkippedArguments(1, 1)
|
| -
|
| +
|
| def testSkippedArgumentsMultiThreadSingleProcess(self):
|
| self._TestSkippedArguments(1, 10)
|
| -
|
| +
|
| + @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| def testSkippedArgumentsSingleThreadMultiProcess(self):
|
| self._TestSkippedArguments(10, 1)
|
| -
|
| +
|
| + @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
|
| def testSkippedArgumentsMultiThreadMultiProcess(self):
|
| self._TestSkippedArguments(10, 10)
|
|
|
| @@ -457,13 +488,14 @@
|
|
|
|
|
| class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
|
| - """Tests that the parallelism framework works when the multiprocessing module
|
| - is not available. Notably, this test has no way to override previous calls
|
| - to gslib.util.MultiprocessingIsAvailable to prevent the initialization of
|
| - all of the global variables in command.py, so this still behaves slightly
|
| - differently than the behavior one would see on a machine where the
|
| - multiprocessing functionality is actually not available (in particular, it
|
| - will not catch the case where a global variable that is not available for
|
| - the sequential path is referenced before initialization).
|
| + """Tests parallelism framework works with multiprocessing module unavailable.
|
| +
|
| + Notably, this test has no way to override previous calls
|
| + to gslib.util.MultiprocessingIsAvailable to prevent the initialization of
|
| + all of the global variables in command.py, so this still behaves slightly
|
| + differently than the behavior one would see on a machine where the
|
| + multiprocessing functionality is actually not available (in particular, it
|
| + will not catch the case where a global variable that is not available for
|
| + the sequential path is referenced before initialization).
|
| """
|
| command_class = FakeCommandWithoutMultiprocessingModule
|
|
|