Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(238)

Unified Diff: tools/telemetry/third_party/gsutil/gslib/tests/test_parallelism_framework.py

Issue 1260493004: Revert "Add gsutil 4.13 to telemetry/third_party" (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/gsutil/gslib/tests/test_parallelism_framework.py
diff --git a/tools/telemetry/third_party/gsutil/gslib/tests/test_parallelism_framework.py b/tools/telemetry/third_party/gsutil/gslib/tests/test_parallelism_framework.py
deleted file mode 100644
index e895c1680a4c99fcfbaa116fc9f95cc3af06c53b..0000000000000000000000000000000000000000
--- a/tools/telemetry/third_party/gsutil/gslib/tests/test_parallelism_framework.py
+++ /dev/null
@@ -1,574 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2013 Google Inc. All Rights Reserved.
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the
-# "Software"), to deal in the Software without restriction, including
-# without limitation the rights to use, copy, modify, merge, publish, dis-
-# tribute, sublicense, and/or sell copies of the Software, and to permit
-# persons to whom the Software is furnished to do so, subject to the fol-
-# lowing conditions:
-#
-# The above copyright notice and this permission notice shall be included
-# in all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-# IN THE SOFTWARE.
-"""Unit tests for gsutil parallelism framework."""
-
-from __future__ import absolute_import
-
-import functools
-import signal
-
-from boto.storage_uri import BucketStorageUri
-from gslib import cs_api_map
-from gslib.command import Command
-from gslib.command import CreateGsutilLogger
-from gslib.command import DummyArgChecker
-import gslib.tests.testcase as testcase
-from gslib.tests.testcase.base import RequiresIsolation
-from gslib.tests.util import unittest
-from gslib.util import IS_WINDOWS
-from gslib.util import MultiprocessingIsAvailable
-
-
-# Amount of time for an individual test to run before timing out. We need a
-# reasonably high value since if many tests are running in parallel, an
-# individual test may take a while to complete.
-_TEST_TIMEOUT_SECONDS = 120
-
-
-def Timeout(func):
- """Decorator used to provide a timeout for functions."""
- @functools.wraps(func)
- def Wrapper(*args, **kwargs):
- if not IS_WINDOWS:
- signal.signal(signal.SIGALRM, _HandleAlarm)
- signal.alarm(_TEST_TIMEOUT_SECONDS)
- try:
- func(*args, **kwargs)
- finally:
- if not IS_WINDOWS:
- signal.alarm(0) # Cancel the alarm.
- return Wrapper
-
-
-# pylint: disable=unused-argument
-def _HandleAlarm(signal_num, cur_stack_frame):
- raise Exception('Test timed out.')
-
-
-class CustomException(Exception):
-
- def __init__(self, exception_str):
- super(CustomException, self).__init__(exception_str)
-
-
-def _ReturnOneValue(cls, args, thread_state=None):
- return 1
-
-
-def _FailureFunc(cls, args, thread_state=None):
- raise CustomException('Failing on purpose.')
-
-
-def _FailingExceptionHandler(cls, e):
- cls.failure_count += 1
- raise CustomException('Exception handler failing on purpose.')
-
-
-def _ExceptionHandler(cls, e):
- cls.logger.exception(e)
- cls.failure_count += 1
-
-
-def _IncrementByLength(cls, args, thread_state=None):
- cls.arg_length_sum += len(args)
-
-
-def _AdjustProcessCountIfWindows(process_count):
- if IS_WINDOWS:
- return 1
- else:
- return process_count
-
-
-def _ReApplyWithReplicatedArguments(cls, args, thread_state=None):
- """Calls Apply with arguments repeated seven times.
-
- The first two elements of args should be the process and thread counts,
- respectively, to be used for the recursive calls.
-
- Args:
- cls: The Command class to call Apply on.
- args: Arguments to pass to Apply.
- thread_state: Unused, required by function signature.
-
- Returns:
- Number of values returned by the two calls to Apply.
- """
- new_args = [args] * 7
- process_count = _AdjustProcessCountIfWindows(args[0])
- thread_count = args[1]
- return_values = cls.Apply(_PerformNRecursiveCalls, new_args,
- _ExceptionHandler, arg_checker=DummyArgChecker,
- process_count=process_count,
- thread_count=thread_count,
- should_return_results=True)
- ret = sum(return_values)
-
- return_values = cls.Apply(_ReturnOneValue, new_args,
- _ExceptionHandler, arg_checker=DummyArgChecker,
- process_count=process_count,
- thread_count=thread_count,
- should_return_results=True)
-
- return len(return_values) + ret
-
-
-def _PerformNRecursiveCalls(cls, args, thread_state=None):
- """Calls Apply to perform N recursive calls.
-
- The first two elements of args should be the process and thread counts,
- respectively, to be used for the recursive calls, while N is the third element
- (the number of recursive calls to make).
-
- Args:
- cls: The Command class to call Apply on.
- args: Arguments to pass to Apply.
- thread_state: Unused, required by function signature.
-
- Returns:
- Number of values returned by the call to Apply.
- """
- process_count = _AdjustProcessCountIfWindows(args[0])
- thread_count = args[1]
- return_values = cls.Apply(_ReturnOneValue, [()] * args[2], _ExceptionHandler,
- arg_checker=DummyArgChecker,
- process_count=process_count,
- thread_count=thread_count,
- should_return_results=True)
- return len(return_values)
-
-
-def _SkipEvenNumbersArgChecker(cls, arg):
- return arg % 2 != 0
-
-
-class FailingIterator(object):
-
- def __init__(self, size, failure_indices):
- self.size = size
- self.failure_indices = failure_indices
- self.current_index = 0
-
- def __iter__(self):
- return self
-
- def next(self):
- if self.current_index == self.size:
- raise StopIteration('')
- elif self.current_index in self.failure_indices:
- self.current_index += 1
- raise CustomException(
- 'Iterator failing on purpose at index %d.' % self.current_index)
- else:
- self.current_index += 1
- return self.current_index - 1
-
-
-class FakeCommand(Command):
- """Fake command class for overriding command instance state."""
- command_spec = Command.CreateCommandSpec(
- 'fake',
- command_name_aliases=[],
- )
- # Help specification. See help_provider.py for documentation.
- help_spec = Command.HelpSpec(
- help_name='fake',
- help_name_aliases=[],
- help_type='command_help',
- help_one_line_summary='Something to take up space.',
- help_text='Something else to take up space.',
- subcommand_help_text={},
- )
-
- def __init__(self, do_parallel):
- self.bucket_storage_uri_class = BucketStorageUri
- support_map = {
- 'gs': ['JSON'],
- 's3': ['XML']
- }
- default_map = {
- 'gs': 'JSON',
- 's3': 'XML'
- }
- self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap(
- cs_api_map.GsutilApiClassMapFactory, support_map, default_map)
- self.logger = CreateGsutilLogger('FakeCommand')
- self.parallel_operations = do_parallel
- self.failure_count = 0
- self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
- self.debug = 0
-
-
-class FakeCommandWithoutMultiprocessingModule(FakeCommand):
-
- def __init__(self, do_parallel):
- super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel)
- self.multiprocessing_is_available = False
-
-
-# TODO: Figure out a good way to test that ctrl+C really stops execution,
-# and also that ctrl+C works when there are still tasks enqueued.
-class TestParallelismFramework(testcase.GsUtilUnitTestCase):
- """gsutil parallelism framework test suite."""
-
- command_class = FakeCommand
-
- def _RunApply(self, func, args_iterator, process_count, thread_count,
- command_inst=None, shared_attrs=None, fail_on_error=False,
- thr_exc_handler=None, arg_checker=DummyArgChecker):
- command_inst = command_inst or self.command_class(True)
- exception_handler = thr_exc_handler or _ExceptionHandler
-
- return command_inst.Apply(func, args_iterator, exception_handler,
- thread_count=thread_count,
- process_count=process_count,
- arg_checker=arg_checker,
- should_return_results=True,
- shared_attrs=shared_attrs,
- fail_on_error=fail_on_error)
-
- @RequiresIsolation
- def testBasicApplySingleProcessSingleThread(self):
- self._TestBasicApply(1, 1)
-
- @RequiresIsolation
- def testBasicApplySingleProcessMultiThread(self):
- self._TestBasicApply(1, 3)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testBasicApplyMultiProcessSingleThread(self):
- self._TestBasicApply(3, 1)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testBasicApplyMultiProcessMultiThread(self):
- self._TestBasicApply(3, 3)
-
- @Timeout
- def _TestBasicApply(self, process_count, thread_count):
- args = [()] * (17 * process_count * thread_count + 1)
-
- results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
- self.assertEqual(len(args), len(results))
-
- @RequiresIsolation
- def testIteratorFailureSingleProcessSingleThread(self):
- self._TestIteratorFailure(1, 1)
-
- @RequiresIsolation
- def testIteratorFailureSingleProcessMultiThread(self):
- self._TestIteratorFailure(1, 3)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testIteratorFailureMultiProcessSingleThread(self):
- self._TestIteratorFailure(3, 1)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testIteratorFailureMultiProcessMultiThread(self):
- self._TestIteratorFailure(3, 3)
-
- @Timeout
- def _TestIteratorFailure(self, process_count, thread_count):
- """Tests apply with a failing iterator."""
- # Tests for fail_on_error == False.
-
- args = FailingIterator(10, [0])
- results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
- self.assertEqual(9, len(results))
-
- args = FailingIterator(10, [5])
- results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
- self.assertEqual(9, len(results))
-
- args = FailingIterator(10, [9])
- results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
- self.assertEqual(9, len(results))
-
- if process_count * thread_count > 1:
- # In this case, we should ignore the fail_on_error flag.
- args = FailingIterator(10, [9])
- results = self._RunApply(_ReturnOneValue, args, process_count,
- thread_count, fail_on_error=True)
- self.assertEqual(9, len(results))
-
- args = FailingIterator(10, range(10))
- results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
- self.assertEqual(0, len(results))
-
- args = FailingIterator(0, [])
- results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
- self.assertEqual(0, len(results))
-
- @RequiresIsolation
- def testTestSharedAttrsWorkSingleProcessSingleThread(self):
- self._TestSharedAttrsWork(1, 1)
-
- @RequiresIsolation
- def testTestSharedAttrsWorkSingleProcessMultiThread(self):
- self._TestSharedAttrsWork(1, 3)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testTestSharedAttrsWorkMultiProcessSingleThread(self):
- self._TestSharedAttrsWork(3, 1)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testTestSharedAttrsWorkMultiProcessMultiThread(self):
- self._TestSharedAttrsWork(3, 3)
-
- @Timeout
- def _TestSharedAttrsWork(self, process_count, thread_count):
- """Tests that Apply successfully uses shared_attrs."""
- command_inst = self.command_class(True)
- command_inst.arg_length_sum = 19
- args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd']
- self._RunApply(_IncrementByLength, args, process_count,
- thread_count, command_inst=command_inst,
- shared_attrs=['arg_length_sum'])
- expected_sum = 19
- for arg in args:
- expected_sum += len(arg)
- self.assertEqual(expected_sum, command_inst.arg_length_sum)
-
- # Test that shared variables work when the iterator fails.
- command_inst = self.command_class(True)
- args = FailingIterator(10, [1, 3, 5])
- self._RunApply(_ReturnOneValue, args, process_count, thread_count,
- command_inst=command_inst, shared_attrs=['failure_count'])
- self.assertEqual(3, command_inst.failure_count)
-
- @RequiresIsolation
- def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
- self._TestThreadsSurviveExceptionsInFunc(1, 1)
-
- @RequiresIsolation
- def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self):
- self._TestThreadsSurviveExceptionsInFunc(1, 3)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self):
- self._TestThreadsSurviveExceptionsInFunc(3, 1)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testThreadsSurviveExceptionsInFuncMultiProcessMultiThread(self):
- self._TestThreadsSurviveExceptionsInFunc(3, 3)
-
- @Timeout
- def _TestThreadsSurviveExceptionsInFunc(self, process_count, thread_count):
- command_inst = self.command_class(True)
- args = ([()] * 5)
- self._RunApply(_FailureFunc, args, process_count, thread_count,
- command_inst=command_inst, shared_attrs=['failure_count'],
- thr_exc_handler=_FailingExceptionHandler)
- self.assertEqual(len(args), command_inst.failure_count)
-
- @RequiresIsolation
- def testThreadsSurviveExceptionsInHandlerSingleProcessSingleThread(self):
- self._TestThreadsSurviveExceptionsInHandler(1, 1)
-
- @RequiresIsolation
- def testThreadsSurviveExceptionsInHandlerSingleProcessMultiThread(self):
- self._TestThreadsSurviveExceptionsInHandler(1, 3)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testThreadsSurviveExceptionsInHandlerMultiProcessSingleThread(self):
- self._TestThreadsSurviveExceptionsInHandler(3, 1)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testThreadsSurviveExceptionsInHandlerMultiProcessMultiThread(self):
- self._TestThreadsSurviveExceptionsInHandler(3, 3)
-
- @Timeout
- def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count):
- command_inst = self.command_class(True)
- args = ([()] * 5)
- self._RunApply(_FailureFunc, args, process_count, thread_count,
- command_inst=command_inst, shared_attrs=['failure_count'],
- thr_exc_handler=_FailingExceptionHandler)
- self.assertEqual(len(args), command_inst.failure_count)
-
- @RequiresIsolation
- @Timeout
- def testFailOnErrorFlag(self):
- """Tests that fail_on_error produces the correct exception on failure."""
- def _ExpectCustomException(test_func):
- try:
- test_func()
- self.fail(
- 'Setting fail_on_error should raise any exception encountered.')
- except CustomException, e:
- pass
- except Exception, e:
- self.fail('Got unexpected error: ' + str(e))
-
- def _RunFailureFunc():
- command_inst = self.command_class(True)
- args = ([()] * 5)
- self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst,
- shared_attrs=['failure_count'], fail_on_error=True)
- _ExpectCustomException(_RunFailureFunc)
-
- def _RunFailingIteratorFirstPosition():
- args = FailingIterator(10, [0])
- results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
- self.assertEqual(0, len(results))
- _ExpectCustomException(_RunFailingIteratorFirstPosition)
-
- def _RunFailingIteratorPositionMiddlePosition():
- args = FailingIterator(10, [5])
- results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
- self.assertEqual(5, len(results))
- _ExpectCustomException(_RunFailingIteratorPositionMiddlePosition)
-
- def _RunFailingIteratorLastPosition():
- args = FailingIterator(10, [9])
- results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
- self.assertEqual(9, len(results))
- _ExpectCustomException(_RunFailingIteratorLastPosition)
-
- def _RunFailingIteratorMultiplePositions():
- args = FailingIterator(10, [1, 3, 5])
- results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
- self.assertEqual(1, len(results))
- _ExpectCustomException(_RunFailingIteratorMultiplePositions)
-
- @RequiresIsolation
- def testRecursiveDepthThreeDifferentFunctionsSingleProcessSingleThread(self):
- self._TestRecursiveDepthThreeDifferentFunctions(1, 1)
-
- @RequiresIsolation
- def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self):
- self._TestRecursiveDepthThreeDifferentFunctions(1, 3)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self):
- self._TestRecursiveDepthThreeDifferentFunctions(3, 1)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self):
- self._TestRecursiveDepthThreeDifferentFunctions(3, 3)
-
- @Timeout
- def _TestRecursiveDepthThreeDifferentFunctions(self, process_count,
- thread_count):
- """Tests recursive application of Apply.
-
- Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
- Apply(C).
-
- Args:
- process_count: Number of processes to use.
- thread_count: Number of threads to use.
- """
- base_args = [3, 1, 4, 1, 5]
- args = [[process_count, thread_count, count] for count in base_args]
-
- results = self._RunApply(_ReApplyWithReplicatedArguments, args,
- process_count, thread_count)
- self.assertEqual(7 * (sum(base_args) + len(base_args)), sum(results))
-
- @RequiresIsolation
- def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self):
- self._TestExceptionInProducerRaisesAndTerminates(1, 1)
-
- @RequiresIsolation
- def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self):
- self._TestExceptionInProducerRaisesAndTerminates(1, 3)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self):
- self._TestExceptionInProducerRaisesAndTerminates(3, 1)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self):
- self._TestExceptionInProducerRaisesAndTerminates(3, 3)
-
- @Timeout
- def _TestExceptionInProducerRaisesAndTerminates(self, process_count,
- thread_count):
- args = self # The ProducerThread will try and fail to iterate over this.
- try:
- self._RunApply(_ReturnOneValue, args, process_count, thread_count)
- self.fail('Did not raise expected exception.')
- except TypeError:
- pass
-
- @RequiresIsolation
- def testSkippedArgumentsSingleThreadSingleProcess(self):
- self._TestSkippedArguments(1, 1)
-
- @RequiresIsolation
- def testSkippedArgumentsMultiThreadSingleProcess(self):
- self._TestSkippedArguments(1, 3)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testSkippedArgumentsSingleThreadMultiProcess(self):
- self._TestSkippedArguments(3, 1)
-
- @RequiresIsolation
- @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
- def testSkippedArgumentsMultiThreadMultiProcess(self):
- self._TestSkippedArguments(3, 3)
-
- @Timeout
- def _TestSkippedArguments(self, process_count, thread_count):
-
- # Skip a proper subset of the arguments.
- n = 2 * process_count * thread_count
- args = range(1, n + 1)
- results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
- arg_checker=_SkipEvenNumbersArgChecker)
- self.assertEqual(n / 2, len(results)) # We know n is even.
- self.assertEqual(n / 2, sum(results))
-
- # Skip all arguments.
- args = [2 * x for x in args]
- results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
- arg_checker=_SkipEvenNumbersArgChecker)
- self.assertEqual(0, len(results))
-
-
-class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
- """Tests parallelism framework works with multiprocessing module unavailable.
-
- Notably, this test has no way to override previous calls
- to gslib.util.MultiprocessingIsAvailable to prevent the initialization of
- all of the global variables in command.py, so this still behaves slightly
- differently than the behavior one would see on a machine where the
- multiprocessing functionality is actually not available (in particular, it
- will not catch the case where a global variable that is not available for
- the sequential path is referenced before initialization).
- """
- command_class = FakeCommandWithoutMultiprocessingModule

Powered by Google App Engine
This is Rietveld 408576698