Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(133)

Unified Diff: gslib/tests/test_parallelism_framework.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « gslib/tests/test_parallel_cp.py ('k') | gslib/tests/test_perfdiag.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: gslib/tests/test_parallelism_framework.py
===================================================================
--- gslib/tests/test_parallelism_framework.py (revision 33376)
+++ gslib/tests/test_parallelism_framework.py (working copy)
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
# Copyright 2013 Google Inc. All Rights Reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a
@@ -18,26 +19,19 @@
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
-
"""Unit tests for gsutil parallelism framework."""
+from __future__ import absolute_import
+
import functools
-import gslib.help_provider as help_provider
-import gslib.tests.testcase as testcase
import signal
+from boto.storage_uri import BucketStorageUri
+from gslib import cs_api_map
from gslib.command import Command
-from gslib.command import COMMAND_NAME
-from gslib.command import COMMAND_NAME_ALIASES
+from gslib.command import CreateGsutilLogger
from gslib.command import DummyArgChecker
-from gslib.command import CreateGsutilLogger
-from gslib.commands.lifecycle import LifecycleCommand
-from gslib.help_provider import HELP_NAME
-from gslib.help_provider import HELP_NAME_ALIASES
-from gslib.help_provider import HELP_ONE_LINE_SUMMARY
-from gslib.help_provider import HELP_TEXT
-from gslib.help_provider import HelpType
-from gslib.help_provider import HELP_TYPE
+import gslib.tests.testcase as testcase
from gslib.tests.util import unittest
from gslib.util import IS_WINDOWS
from gslib.util import MultiprocessingIsAvailable
@@ -46,7 +40,7 @@
def Timeout(func):
"""Decorator used to provide a timeout for functions."""
@functools.wraps(func)
- def wrapper(*args, **kwargs):
+ def Wrapper(*args, **kwargs):
if not IS_WINDOWS:
signal.signal(signal.SIGALRM, _HandleAlarm)
signal.alarm(5)
@@ -55,33 +49,42 @@
finally:
if not IS_WINDOWS:
signal.alarm(0) # Cancel the alarm.
- return wrapper
+ return Wrapper
+
+# pylint: disable=unused-argument
def _HandleAlarm(signal_num, cur_stack_frame):
- raise Exception("Test timed out.")
+ raise Exception('Test timed out.')
+
class CustomException(Exception):
+
def __init__(self, exception_str):
super(CustomException, self).__init__(exception_str)
-def _ReturnOneValue(cls, args):
+def _ReturnOneValue(cls, args, thread_state=None):
return 1
-def _FailureFunc(cls, args):
- raise CustomException("Failing on purpose.")
+def _FailureFunc(cls, args, thread_state=None):
+ raise CustomException('Failing on purpose.')
+
+
def _FailingExceptionHandler(cls, e):
cls.failure_count += 1
- raise CustomException("Exception handler failing on purpose.")
+ raise CustomException('Exception handler failing on purpose.')
+
def _ExceptionHandler(cls, e):
cls.logger.exception(e)
cls.failure_count += 1
-def _IncrementByLength(cls, args):
+
+def _IncrementByLength(cls, args, thread_state=None):
cls.arg_length_sum += len(args)
+
def _AdjustProcessCountIfWindows(process_count):
if IS_WINDOWS:
return 1
@@ -89,7 +92,8 @@
return process_count
-def _ReApplyWithReplicatedArguments(cls, args):
+def _ReApplyWithReplicatedArguments(cls, args, thread_state=None):
+ """Calls Apply with arguments repeated seven times."""
new_args = [args] * 7
process_count = _AdjustProcessCountIfWindows(2)
return_values = cls.Apply(_PerformNRecursiveCalls, new_args,
@@ -97,16 +101,16 @@
process_count=process_count, thread_count=2,
should_return_results=True)
ret = sum(return_values)
-
+
return_values = cls.Apply(_ReturnOneValue, new_args,
_ExceptionHandler, arg_checker=DummyArgChecker,
process_count=process_count, thread_count=2,
should_return_results=True)
-
+
return len(return_values) + ret
-def _PerformNRecursiveCalls(cls, args):
+def _PerformNRecursiveCalls(cls, args, thread_state=None):
process_count = _AdjustProcessCountIfWindows(2)
return_values = cls.Apply(_ReturnOneValue, [()] * args, _ExceptionHandler,
arg_checker=DummyArgChecker,
@@ -120,6 +124,7 @@
class FailingIterator(object):
+
def __init__(self, size, failure_indices):
self.size = size
self.failure_indices = failure_indices
@@ -141,26 +146,42 @@
class FakeCommand(Command):
- command_spec = {
- COMMAND_NAME : 'fake',
- COMMAND_NAME_ALIASES : [],
- }
- help_spec = {
- HELP_NAME : 'fake',
- HELP_NAME_ALIASES : [],
- HELP_TYPE : HelpType.COMMAND_HELP,
- HELP_ONE_LINE_SUMMARY : 'Something to take up space.',
- HELP_TEXT : 'Something else to take up space.',
- }
+ """Fake command class for overriding command instance state."""
+ command_spec = Command.CreateCommandSpec(
+ 'fake',
+ command_name_aliases=[],
+ )
+ # Help specification. See help_provider.py for documentation.
+ help_spec = Command.HelpSpec(
+ help_name='fake',
+ help_name_aliases=[],
+ help_type='command_help',
+ help_one_line_summary='Something to take up space.',
+ help_text='Something else to take up space.',
+ subcommand_help_text={},
+ )
def __init__(self, do_parallel):
+ self.bucket_storage_uri_class = BucketStorageUri
+ support_map = {
+ 'gs': ['JSON'],
+ 's3': ['XML']
+ }
+ default_map = {
+ 'gs': 'JSON',
+ 's3': 'XML'
+ }
+ self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap(
+ cs_api_map.GsutilApiClassMapFactory, support_map, default_map)
self.logger = CreateGsutilLogger('FakeCommand')
self.parallel_operations = do_parallel
self.failure_count = 0
self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
+ self.debug = 0
class FakeCommandWithoutMultiprocessingModule(FakeCommand):
+
def __init__(self, do_parallel):
super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel)
self.multiprocessing_is_available = False
@@ -170,7 +191,7 @@
# and also that ctrl+C works when there are still tasks enqueued.
class TestParallelismFramework(testcase.GsUtilUnitTestCase):
"""gsutil parallelism framework test suite."""
-
+
command_class = FakeCommand
def _RunApply(self, func, args_iterator, process_count, thread_count,
@@ -178,6 +199,7 @@
thr_exc_handler=None, arg_checker=DummyArgChecker):
command_inst = command_inst or self.command_class(True)
exception_handler = thr_exc_handler or _ExceptionHandler
+
return command_inst.Apply(func, args_iterator, exception_handler,
thread_count=thread_count,
process_count=process_count,
@@ -223,6 +245,7 @@
@Timeout
def _TestIteratorFailure(self, process_count, thread_count):
+ """Tests apply with a failing iterator."""
# Tests for fail_on_error == False.
args = FailingIterator(10, [0])
@@ -236,7 +259,7 @@
args = FailingIterator(10, [9])
results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
self.assertEqual(9, len(results))
-
+
if process_count * thread_count > 1:
# In this case, we should ignore the fail_on_error flag.
args = FailingIterator(10, [9])
@@ -247,7 +270,7 @@
args = FailingIterator(10, range(10))
results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
self.assertEqual(0, len(results))
-
+
args = FailingIterator(0, [])
results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
self.assertEqual(0, len(results))
@@ -268,6 +291,7 @@
@Timeout
def _TestSharedAttrsWork(self, process_count, thread_count):
+ """Tests that Apply successfully uses shared_attrs."""
command_inst = self.command_class(True)
command_inst.arg_length_sum = 19
args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd']
@@ -285,7 +309,7 @@
self._RunApply(_ReturnOneValue, args, process_count, thread_count,
command_inst=command_inst, shared_attrs=['failure_count'])
self.assertEqual(3, command_inst.failure_count)
-
+
def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
self._TestThreadsSurviveExceptionsInFunc(1, 1)
@@ -334,6 +358,7 @@
@Timeout
def testFailOnErrorFlag(self):
+ """Tests that fail_on_error produces the correct exception on failure."""
def _ExpectCustomException(test_func):
try:
test_func()
@@ -342,15 +367,13 @@
except CustomException, e:
pass
except Exception, e:
- self.fail("Got unexpected error: " + str(e))
-
+ self.fail('Got unexpected error: ' + str(e))
+
def _RunFailureFunc():
command_inst = self.command_class(True)
args = ([()] * 5)
- results = self._RunApply(_FailureFunc, args, 1, 1,
- command_inst=command_inst,
- shared_attrs=['failure_count'],
- fail_on_error=True)
+ self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst,
+ shared_attrs=['failure_count'], fail_on_error=True)
_ExpectCustomException(_RunFailureFunc)
def _RunFailingIteratorFirstPosition():
@@ -390,12 +413,18 @@
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self):
self._TestRecursiveDepthThreeDifferentFunctions(10, 10)
-
+
@Timeout
def _TestRecursiveDepthThreeDifferentFunctions(self, process_count,
thread_count):
- """Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
- Apply(C).
+ """Tests recursive application of Apply.
+
+ Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
+ Apply(C).
+
+ Args:
+ process_count: Number of processes to use.
+ thread_count: Number of threads to use.
"""
args = ([3, 1, 4, 1, 5])
results = self._RunApply(_ReApplyWithReplicatedArguments, args,
@@ -422,19 +451,21 @@
args = self # The ProducerThread will try and fail to iterate over this.
try:
self._RunApply(_ReturnOneValue, args, process_count, thread_count)
- self.fail("Did not throw expected exception.")
- except TypeError, e:
+ self.fail('Did not raise expected exception.')
+ except TypeError:
pass
-
+
def testSkippedArgumentsSingleThreadSingleProcess(self):
self._TestSkippedArguments(1, 1)
-
+
def testSkippedArgumentsMultiThreadSingleProcess(self):
self._TestSkippedArguments(1, 10)
-
+
+ @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testSkippedArgumentsSingleThreadMultiProcess(self):
self._TestSkippedArguments(10, 1)
-
+
+ @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testSkippedArgumentsMultiThreadMultiProcess(self):
self._TestSkippedArguments(10, 10)
@@ -457,13 +488,14 @@
class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
- """Tests that the parallelism framework works when the multiprocessing module
- is not available. Notably, this test has no way to override previous calls
- to gslib.util.MultiprocessingIsAvailable to prevent the initialization of
- all of the global variables in command.py, so this still behaves slightly
- differently than the behavior one would see on a machine where the
- multiprocessing functionality is actually not available (in particular, it
- will not catch the case where a global variable that is not available for
- the sequential path is referenced before initialization).
+ """Tests parallelism framework works with multiprocessing module unavailable.
+
+ Notably, this test has no way to override previous calls
+ to gslib.util.MultiprocessingIsAvailable to prevent the initialization of
+ all of the global variables in command.py, so this still behaves slightly
+ differently than the behavior one would see on a machine where the
+ multiprocessing functionality is actually not available (in particular, it
+ will not catch the case where a global variable that is not available for
+ the sequential path is referenced before initialization).
"""
command_class = FakeCommandWithoutMultiprocessingModule
« no previous file with comments | « gslib/tests/test_parallel_cp.py ('k') | gslib/tests/test_perfdiag.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698