| OLD | NEW |
| 1 # -*- coding: utf-8 -*- |
| 1 # Copyright 2013 Google Inc. All Rights Reserved. | 2 # Copyright 2013 Google Inc. All Rights Reserved. |
| 2 # | 3 # |
| 3 # Permission is hereby granted, free of charge, to any person obtaining a | 4 # Permission is hereby granted, free of charge, to any person obtaining a |
| 4 # copy of this software and associated documentation files (the | 5 # copy of this software and associated documentation files (the |
| 5 # "Software"), to deal in the Software without restriction, including | 6 # "Software"), to deal in the Software without restriction, including |
| 6 # without limitation the rights to use, copy, modify, merge, publish, dis- | 7 # without limitation the rights to use, copy, modify, merge, publish, dis- |
| 7 # tribute, sublicense, and/or sell copies of the Software, and to permit | 8 # tribute, sublicense, and/or sell copies of the Software, and to permit |
| 8 # persons to whom the Software is furnished to do so, subject to the fol- | 9 # persons to whom the Software is furnished to do so, subject to the fol- |
| 9 # lowing conditions: | 10 # lowing conditions: |
| 10 # | 11 # |
| 11 # The above copyright notice and this permission notice shall be included | 12 # The above copyright notice and this permission notice shall be included |
| 12 # in all copies or substantial portions of the Software. | 13 # in all copies or substantial portions of the Software. |
| 13 # | 14 # |
| 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| 20 # IN THE SOFTWARE. | 21 # IN THE SOFTWARE. |
| 21 | |
| 22 """Unit tests for gsutil parallelism framework.""" | 22 """Unit tests for gsutil parallelism framework.""" |
| 23 | 23 |
| 24 from __future__ import absolute_import |
| 25 |
| 24 import functools | 26 import functools |
| 25 import gslib.help_provider as help_provider | |
| 26 import gslib.tests.testcase as testcase | |
| 27 import signal | 27 import signal |
| 28 | 28 |
| 29 from boto.storage_uri import BucketStorageUri |
| 30 from gslib import cs_api_map |
| 29 from gslib.command import Command | 31 from gslib.command import Command |
| 30 from gslib.command import COMMAND_NAME | 32 from gslib.command import CreateGsutilLogger |
| 31 from gslib.command import COMMAND_NAME_ALIASES | |
| 32 from gslib.command import DummyArgChecker | 33 from gslib.command import DummyArgChecker |
| 33 from gslib.command import CreateGsutilLogger | 34 import gslib.tests.testcase as testcase |
| 34 from gslib.commands.lifecycle import LifecycleCommand | |
| 35 from gslib.help_provider import HELP_NAME | |
| 36 from gslib.help_provider import HELP_NAME_ALIASES | |
| 37 from gslib.help_provider import HELP_ONE_LINE_SUMMARY | |
| 38 from gslib.help_provider import HELP_TEXT | |
| 39 from gslib.help_provider import HelpType | |
| 40 from gslib.help_provider import HELP_TYPE | |
| 41 from gslib.tests.util import unittest | 35 from gslib.tests.util import unittest |
| 42 from gslib.util import IS_WINDOWS | 36 from gslib.util import IS_WINDOWS |
| 43 from gslib.util import MultiprocessingIsAvailable | 37 from gslib.util import MultiprocessingIsAvailable |
| 44 | 38 |
| 45 | 39 |
| 46 def Timeout(func): | 40 def Timeout(func): |
| 47 """Decorator used to provide a timeout for functions.""" | 41 """Decorator used to provide a timeout for functions.""" |
| 48 @functools.wraps(func) | 42 @functools.wraps(func) |
| 49 def wrapper(*args, **kwargs): | 43 def Wrapper(*args, **kwargs): |
| 50 if not IS_WINDOWS: | 44 if not IS_WINDOWS: |
| 51 signal.signal(signal.SIGALRM, _HandleAlarm) | 45 signal.signal(signal.SIGALRM, _HandleAlarm) |
| 52 signal.alarm(5) | 46 signal.alarm(5) |
| 53 try: | 47 try: |
| 54 func(*args, **kwargs) | 48 func(*args, **kwargs) |
| 55 finally: | 49 finally: |
| 56 if not IS_WINDOWS: | 50 if not IS_WINDOWS: |
| 57 signal.alarm(0) # Cancel the alarm. | 51 signal.alarm(0) # Cancel the alarm. |
| 58 return wrapper | 52 return Wrapper |
| 59 | 53 |
| 54 |
| 55 # pylint: disable=unused-argument |
| 60 def _HandleAlarm(signal_num, cur_stack_frame): | 56 def _HandleAlarm(signal_num, cur_stack_frame): |
| 61 raise Exception("Test timed out.") | 57 raise Exception('Test timed out.') |
| 58 |
| 62 | 59 |
| 63 class CustomException(Exception): | 60 class CustomException(Exception): |
| 61 |
| 64 def __init__(self, exception_str): | 62 def __init__(self, exception_str): |
| 65 super(CustomException, self).__init__(exception_str) | 63 super(CustomException, self).__init__(exception_str) |
| 66 | 64 |
| 67 | 65 |
| 68 def _ReturnOneValue(cls, args): | 66 def _ReturnOneValue(cls, args, thread_state=None): |
| 69 return 1 | 67 return 1 |
| 70 | 68 |
| 71 def _FailureFunc(cls, args): | 69 |
| 72 raise CustomException("Failing on purpose.") | 70 def _FailureFunc(cls, args, thread_state=None): |
| 71 raise CustomException('Failing on purpose.') |
| 72 |
| 73 | 73 |
| 74 def _FailingExceptionHandler(cls, e): | 74 def _FailingExceptionHandler(cls, e): |
| 75 cls.failure_count += 1 | 75 cls.failure_count += 1 |
| 76 raise CustomException("Exception handler failing on purpose.") | 76 raise CustomException('Exception handler failing on purpose.') |
| 77 |
| 77 | 78 |
| 78 def _ExceptionHandler(cls, e): | 79 def _ExceptionHandler(cls, e): |
| 79 cls.logger.exception(e) | 80 cls.logger.exception(e) |
| 80 cls.failure_count += 1 | 81 cls.failure_count += 1 |
| 81 | 82 |
| 82 def _IncrementByLength(cls, args): | 83 |
| 84 def _IncrementByLength(cls, args, thread_state=None): |
| 83 cls.arg_length_sum += len(args) | 85 cls.arg_length_sum += len(args) |
| 84 | 86 |
| 87 |
| 85 def _AdjustProcessCountIfWindows(process_count): | 88 def _AdjustProcessCountIfWindows(process_count): |
| 86 if IS_WINDOWS: | 89 if IS_WINDOWS: |
| 87 return 1 | 90 return 1 |
| 88 else: | 91 else: |
| 89 return process_count | 92 return process_count |
| 90 | 93 |
| 91 | 94 |
| 92 def _ReApplyWithReplicatedArguments(cls, args): | 95 def _ReApplyWithReplicatedArguments(cls, args, thread_state=None): |
| 96 """Calls Apply with arguments repeated seven times.""" |
| 93 new_args = [args] * 7 | 97 new_args = [args] * 7 |
| 94 process_count = _AdjustProcessCountIfWindows(2) | 98 process_count = _AdjustProcessCountIfWindows(2) |
| 95 return_values = cls.Apply(_PerformNRecursiveCalls, new_args, | 99 return_values = cls.Apply(_PerformNRecursiveCalls, new_args, |
| 96 _ExceptionHandler, arg_checker=DummyArgChecker, | 100 _ExceptionHandler, arg_checker=DummyArgChecker, |
| 97 process_count=process_count, thread_count=2, | 101 process_count=process_count, thread_count=2, |
| 98 should_return_results=True) | 102 should_return_results=True) |
| 99 ret = sum(return_values) | 103 ret = sum(return_values) |
| 100 | 104 |
| 101 return_values = cls.Apply(_ReturnOneValue, new_args, | 105 return_values = cls.Apply(_ReturnOneValue, new_args, |
| 102 _ExceptionHandler, arg_checker=DummyArgChecker, | 106 _ExceptionHandler, arg_checker=DummyArgChecker, |
| 103 process_count=process_count, thread_count=2, | 107 process_count=process_count, thread_count=2, |
| 104 should_return_results=True) | 108 should_return_results=True) |
| 105 | 109 |
| 106 return len(return_values) + ret | 110 return len(return_values) + ret |
| 107 | 111 |
| 108 | 112 |
| 109 def _PerformNRecursiveCalls(cls, args): | 113 def _PerformNRecursiveCalls(cls, args, thread_state=None): |
| 110 process_count = _AdjustProcessCountIfWindows(2) | 114 process_count = _AdjustProcessCountIfWindows(2) |
| 111 return_values = cls.Apply(_ReturnOneValue, [()] * args, _ExceptionHandler, | 115 return_values = cls.Apply(_ReturnOneValue, [()] * args, _ExceptionHandler, |
| 112 arg_checker=DummyArgChecker, | 116 arg_checker=DummyArgChecker, |
| 113 process_count=process_count, thread_count=2, | 117 process_count=process_count, thread_count=2, |
| 114 should_return_results=True) | 118 should_return_results=True) |
| 115 return len(return_values) | 119 return len(return_values) |
| 116 | 120 |
| 117 | 121 |
| 118 def _SkipEvenNumbersArgChecker(cls, arg): | 122 def _SkipEvenNumbersArgChecker(cls, arg): |
| 119 return arg % 2 != 0 | 123 return arg % 2 != 0 |
| 120 | 124 |
| 121 | 125 |
| 122 class FailingIterator(object): | 126 class FailingIterator(object): |
| 127 |
| 123 def __init__(self, size, failure_indices): | 128 def __init__(self, size, failure_indices): |
| 124 self.size = size | 129 self.size = size |
| 125 self.failure_indices = failure_indices | 130 self.failure_indices = failure_indices |
| 126 self.current_index = 0 | 131 self.current_index = 0 |
| 127 | 132 |
| 128 def __iter__(self): | 133 def __iter__(self): |
| 129 return self | 134 return self |
| 130 | 135 |
| 131 def next(self): | 136 def next(self): |
| 132 if self.current_index == self.size: | 137 if self.current_index == self.size: |
| 133 raise StopIteration('') | 138 raise StopIteration('') |
| 134 elif self.current_index in self.failure_indices: | 139 elif self.current_index in self.failure_indices: |
| 135 self.current_index += 1 | 140 self.current_index += 1 |
| 136 raise CustomException( | 141 raise CustomException( |
| 137 'Iterator failing on purpose at index %d.' % self.current_index) | 142 'Iterator failing on purpose at index %d.' % self.current_index) |
| 138 else: | 143 else: |
| 139 self.current_index += 1 | 144 self.current_index += 1 |
| 140 return self.current_index - 1 | 145 return self.current_index - 1 |
| 141 | 146 |
| 142 | 147 |
| 143 class FakeCommand(Command): | 148 class FakeCommand(Command): |
| 144 command_spec = { | 149 """Fake command class for overriding command instance state.""" |
| 145 COMMAND_NAME : 'fake', | 150 command_spec = Command.CreateCommandSpec( |
| 146 COMMAND_NAME_ALIASES : [], | 151 'fake', |
| 147 } | 152 command_name_aliases=[], |
| 148 help_spec = { | 153 ) |
| 149 HELP_NAME : 'fake', | 154 # Help specification. See help_provider.py for documentation. |
| 150 HELP_NAME_ALIASES : [], | 155 help_spec = Command.HelpSpec( |
| 151 HELP_TYPE : HelpType.COMMAND_HELP, | 156 help_name='fake', |
| 152 HELP_ONE_LINE_SUMMARY : 'Something to take up space.', | 157 help_name_aliases=[], |
| 153 HELP_TEXT : 'Something else to take up space.', | 158 help_type='command_help', |
| 154 } | 159 help_one_line_summary='Something to take up space.', |
| 160 help_text='Something else to take up space.', |
| 161 subcommand_help_text={}, |
| 162 ) |
| 155 | 163 |
| 156 def __init__(self, do_parallel): | 164 def __init__(self, do_parallel): |
| 165 self.bucket_storage_uri_class = BucketStorageUri |
| 166 support_map = { |
| 167 'gs': ['JSON'], |
| 168 's3': ['XML'] |
| 169 } |
| 170 default_map = { |
| 171 'gs': 'JSON', |
| 172 's3': 'XML' |
| 173 } |
| 174 self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap( |
| 175 cs_api_map.GsutilApiClassMapFactory, support_map, default_map) |
| 157 self.logger = CreateGsutilLogger('FakeCommand') | 176 self.logger = CreateGsutilLogger('FakeCommand') |
| 158 self.parallel_operations = do_parallel | 177 self.parallel_operations = do_parallel |
| 159 self.failure_count = 0 | 178 self.failure_count = 0 |
| 160 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] | 179 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] |
| 180 self.debug = 0 |
| 161 | 181 |
| 162 | 182 |
| 163 class FakeCommandWithoutMultiprocessingModule(FakeCommand): | 183 class FakeCommandWithoutMultiprocessingModule(FakeCommand): |
| 184 |
| 164 def __init__(self, do_parallel): | 185 def __init__(self, do_parallel): |
| 165 super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel) | 186 super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel) |
| 166 self.multiprocessing_is_available = False | 187 self.multiprocessing_is_available = False |
| 167 | 188 |
| 168 | 189 |
| 169 # TODO: Figure out a good way to test that ctrl+C really stops execution, | 190 # TODO: Figure out a good way to test that ctrl+C really stops execution, |
| 170 # and also that ctrl+C works when there are still tasks enqueued. | 191 # and also that ctrl+C works when there are still tasks enqueued. |
| 171 class TestParallelismFramework(testcase.GsUtilUnitTestCase): | 192 class TestParallelismFramework(testcase.GsUtilUnitTestCase): |
| 172 """gsutil parallelism framework test suite.""" | 193 """gsutil parallelism framework test suite.""" |
| 173 | 194 |
| 174 command_class = FakeCommand | 195 command_class = FakeCommand |
| 175 | 196 |
| 176 def _RunApply(self, func, args_iterator, process_count, thread_count, | 197 def _RunApply(self, func, args_iterator, process_count, thread_count, |
| 177 command_inst=None, shared_attrs=None, fail_on_error=False, | 198 command_inst=None, shared_attrs=None, fail_on_error=False, |
| 178 thr_exc_handler=None, arg_checker=DummyArgChecker): | 199 thr_exc_handler=None, arg_checker=DummyArgChecker): |
| 179 command_inst = command_inst or self.command_class(True) | 200 command_inst = command_inst or self.command_class(True) |
| 180 exception_handler = thr_exc_handler or _ExceptionHandler | 201 exception_handler = thr_exc_handler or _ExceptionHandler |
| 202 |
| 181 return command_inst.Apply(func, args_iterator, exception_handler, | 203 return command_inst.Apply(func, args_iterator, exception_handler, |
| 182 thread_count=thread_count, | 204 thread_count=thread_count, |
| 183 process_count=process_count, | 205 process_count=process_count, |
| 184 arg_checker=arg_checker, | 206 arg_checker=arg_checker, |
| 185 should_return_results=True, | 207 should_return_results=True, |
| 186 shared_attrs=shared_attrs, | 208 shared_attrs=shared_attrs, |
| 187 fail_on_error=fail_on_error) | 209 fail_on_error=fail_on_error) |
| 188 | 210 |
| 189 def testBasicApplySingleProcessSingleThread(self): | 211 def testBasicApplySingleProcessSingleThread(self): |
| 190 self._TestBasicApply(1, 1) | 212 self._TestBasicApply(1, 1) |
| (...skipping 25 matching lines...) Expand all Loading... |
| 216 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 238 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 217 def testIteratorFailureMultiProcessSingleThread(self): | 239 def testIteratorFailureMultiProcessSingleThread(self): |
| 218 self._TestIteratorFailure(10, 1) | 240 self._TestIteratorFailure(10, 1) |
| 219 | 241 |
| 220 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 242 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 221 def testIteratorFailureMultiProcessMultiThread(self): | 243 def testIteratorFailureMultiProcessMultiThread(self): |
| 222 self._TestIteratorFailure(10, 10) | 244 self._TestIteratorFailure(10, 10) |
| 223 | 245 |
| 224 @Timeout | 246 @Timeout |
| 225 def _TestIteratorFailure(self, process_count, thread_count): | 247 def _TestIteratorFailure(self, process_count, thread_count): |
| 248 """Tests apply with a failing iterator.""" |
| 226 # Tests for fail_on_error == False. | 249 # Tests for fail_on_error == False. |
| 227 | 250 |
| 228 args = FailingIterator(10, [0]) | 251 args = FailingIterator(10, [0]) |
| 229 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) | 252 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| 230 self.assertEqual(9, len(results)) | 253 self.assertEqual(9, len(results)) |
| 231 | 254 |
| 232 args = FailingIterator(10, [5]) | 255 args = FailingIterator(10, [5]) |
| 233 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) | 256 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| 234 self.assertEqual(9, len(results)) | 257 self.assertEqual(9, len(results)) |
| 235 | 258 |
| 236 args = FailingIterator(10, [9]) | 259 args = FailingIterator(10, [9]) |
| 237 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) | 260 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| 238 self.assertEqual(9, len(results)) | 261 self.assertEqual(9, len(results)) |
| 239 | 262 |
| 240 if process_count * thread_count > 1: | 263 if process_count * thread_count > 1: |
| 241 # In this case, we should ignore the fail_on_error flag. | 264 # In this case, we should ignore the fail_on_error flag. |
| 242 args = FailingIterator(10, [9]) | 265 args = FailingIterator(10, [9]) |
| 243 results = self._RunApply(_ReturnOneValue, args, process_count, | 266 results = self._RunApply(_ReturnOneValue, args, process_count, |
| 244 thread_count, fail_on_error=True) | 267 thread_count, fail_on_error=True) |
| 245 self.assertEqual(9, len(results)) | 268 self.assertEqual(9, len(results)) |
| 246 | 269 |
| 247 args = FailingIterator(10, range(10)) | 270 args = FailingIterator(10, range(10)) |
| 248 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) | 271 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| 249 self.assertEqual(0, len(results)) | 272 self.assertEqual(0, len(results)) |
| 250 | 273 |
| 251 args = FailingIterator(0, []) | 274 args = FailingIterator(0, []) |
| 252 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) | 275 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| 253 self.assertEqual(0, len(results)) | 276 self.assertEqual(0, len(results)) |
| 254 | 277 |
| 255 def testTestSharedAttrsWorkSingleProcessSingleThread(self): | 278 def testTestSharedAttrsWorkSingleProcessSingleThread(self): |
| 256 self._TestSharedAttrsWork(1, 1) | 279 self._TestSharedAttrsWork(1, 1) |
| 257 | 280 |
| 258 def testTestSharedAttrsWorkSingleProcessMultiThread(self): | 281 def testTestSharedAttrsWorkSingleProcessMultiThread(self): |
| 259 self._TestSharedAttrsWork(1, 10) | 282 self._TestSharedAttrsWork(1, 10) |
| 260 | 283 |
| 261 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 284 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 262 def testTestSharedAttrsWorkMultiProcessSingleThread(self): | 285 def testTestSharedAttrsWorkMultiProcessSingleThread(self): |
| 263 self._TestSharedAttrsWork(10, 1) | 286 self._TestSharedAttrsWork(10, 1) |
| 264 | 287 |
| 265 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 288 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 266 def testTestSharedAttrsWorkMultiProcessMultiThread(self): | 289 def testTestSharedAttrsWorkMultiProcessMultiThread(self): |
| 267 self._TestSharedAttrsWork(10, 10) | 290 self._TestSharedAttrsWork(10, 10) |
| 268 | 291 |
| 269 @Timeout | 292 @Timeout |
| 270 def _TestSharedAttrsWork(self, process_count, thread_count): | 293 def _TestSharedAttrsWork(self, process_count, thread_count): |
| 294 """Tests that Apply successfully uses shared_attrs.""" |
| 271 command_inst = self.command_class(True) | 295 command_inst = self.command_class(True) |
| 272 command_inst.arg_length_sum = 19 | 296 command_inst.arg_length_sum = 19 |
| 273 args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd'] | 297 args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd'] |
| 274 self._RunApply(_IncrementByLength, args, process_count, | 298 self._RunApply(_IncrementByLength, args, process_count, |
| 275 thread_count, command_inst=command_inst, | 299 thread_count, command_inst=command_inst, |
| 276 shared_attrs=['arg_length_sum']) | 300 shared_attrs=['arg_length_sum']) |
| 277 expected_sum = 19 | 301 expected_sum = 19 |
| 278 for arg in args: | 302 for arg in args: |
| 279 expected_sum += len(arg) | 303 expected_sum += len(arg) |
| 280 self.assertEqual(expected_sum, command_inst.arg_length_sum) | 304 self.assertEqual(expected_sum, command_inst.arg_length_sum) |
| 281 | 305 |
| 282 # Test that shared variables work when the iterator fails. | 306 # Test that shared variables work when the iterator fails. |
| 283 command_inst = self.command_class(True) | 307 command_inst = self.command_class(True) |
| 284 args = FailingIterator(10, [1, 3, 5]) | 308 args = FailingIterator(10, [1, 3, 5]) |
| 285 self._RunApply(_ReturnOneValue, args, process_count, thread_count, | 309 self._RunApply(_ReturnOneValue, args, process_count, thread_count, |
| 286 command_inst=command_inst, shared_attrs=['failure_count']) | 310 command_inst=command_inst, shared_attrs=['failure_count']) |
| 287 self.assertEqual(3, command_inst.failure_count) | 311 self.assertEqual(3, command_inst.failure_count) |
| 288 | 312 |
| 289 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self): | 313 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self): |
| 290 self._TestThreadsSurviveExceptionsInFunc(1, 1) | 314 self._TestThreadsSurviveExceptionsInFunc(1, 1) |
| 291 | 315 |
| 292 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self): | 316 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self): |
| 293 self._TestThreadsSurviveExceptionsInFunc(1, 10) | 317 self._TestThreadsSurviveExceptionsInFunc(1, 10) |
| 294 | 318 |
| 295 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 319 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 296 def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self): | 320 def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self): |
| 297 self._TestThreadsSurviveExceptionsInFunc(10, 1) | 321 self._TestThreadsSurviveExceptionsInFunc(10, 1) |
| 298 | 322 |
| (...skipping 28 matching lines...) Expand all Loading... |
| 327 def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count): | 351 def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count): |
| 328 command_inst = self.command_class(True) | 352 command_inst = self.command_class(True) |
| 329 args = ([()] * 5) | 353 args = ([()] * 5) |
| 330 self._RunApply(_FailureFunc, args, process_count, thread_count, | 354 self._RunApply(_FailureFunc, args, process_count, thread_count, |
| 331 command_inst=command_inst, shared_attrs=['failure_count'], | 355 command_inst=command_inst, shared_attrs=['failure_count'], |
| 332 thr_exc_handler=_FailingExceptionHandler) | 356 thr_exc_handler=_FailingExceptionHandler) |
| 333 self.assertEqual(len(args), command_inst.failure_count) | 357 self.assertEqual(len(args), command_inst.failure_count) |
| 334 | 358 |
| 335 @Timeout | 359 @Timeout |
| 336 def testFailOnErrorFlag(self): | 360 def testFailOnErrorFlag(self): |
| 361 """Tests that fail_on_error produces the correct exception on failure.""" |
| 337 def _ExpectCustomException(test_func): | 362 def _ExpectCustomException(test_func): |
| 338 try: | 363 try: |
| 339 test_func() | 364 test_func() |
| 340 self.fail( | 365 self.fail( |
| 341 'Setting fail_on_error should raise any exception encountered.') | 366 'Setting fail_on_error should raise any exception encountered.') |
| 342 except CustomException, e: | 367 except CustomException, e: |
| 343 pass | 368 pass |
| 344 except Exception, e: | 369 except Exception, e: |
| 345 self.fail("Got unexpected error: " + str(e)) | 370 self.fail('Got unexpected error: ' + str(e)) |
| 346 | 371 |
| 347 def _RunFailureFunc(): | 372 def _RunFailureFunc(): |
| 348 command_inst = self.command_class(True) | 373 command_inst = self.command_class(True) |
| 349 args = ([()] * 5) | 374 args = ([()] * 5) |
| 350 results = self._RunApply(_FailureFunc, args, 1, 1, | 375 self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst, |
| 351 command_inst=command_inst, | 376 shared_attrs=['failure_count'], fail_on_error=True) |
| 352 shared_attrs=['failure_count'], | |
| 353 fail_on_error=True) | |
| 354 _ExpectCustomException(_RunFailureFunc) | 377 _ExpectCustomException(_RunFailureFunc) |
| 355 | 378 |
| 356 def _RunFailingIteratorFirstPosition(): | 379 def _RunFailingIteratorFirstPosition(): |
| 357 args = FailingIterator(10, [0]) | 380 args = FailingIterator(10, [0]) |
| 358 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) | 381 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) |
| 359 self.assertEqual(0, len(results)) | 382 self.assertEqual(0, len(results)) |
| 360 _ExpectCustomException(_RunFailingIteratorFirstPosition) | 383 _ExpectCustomException(_RunFailingIteratorFirstPosition) |
| 361 | 384 |
| 362 def _RunFailingIteratorPositionMiddlePosition(): | 385 def _RunFailingIteratorPositionMiddlePosition(): |
| 363 args = FailingIterator(10, [5]) | 386 args = FailingIterator(10, [5]) |
| (...skipping 19 matching lines...) Expand all Loading... |
| 383 def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self): | 406 def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self): |
| 384 self._TestRecursiveDepthThreeDifferentFunctions(1, 10) | 407 self._TestRecursiveDepthThreeDifferentFunctions(1, 10) |
| 385 | 408 |
| 386 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 409 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 387 def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self): | 410 def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self): |
| 388 self._TestRecursiveDepthThreeDifferentFunctions(10, 1) | 411 self._TestRecursiveDepthThreeDifferentFunctions(10, 1) |
| 389 | 412 |
| 390 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 413 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 391 def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self): | 414 def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self): |
| 392 self._TestRecursiveDepthThreeDifferentFunctions(10, 10) | 415 self._TestRecursiveDepthThreeDifferentFunctions(10, 10) |
| 393 | 416 |
| 394 @Timeout | 417 @Timeout |
| 395 def _TestRecursiveDepthThreeDifferentFunctions(self, process_count, | 418 def _TestRecursiveDepthThreeDifferentFunctions(self, process_count, |
| 396 thread_count): | 419 thread_count): |
| 397 """Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls | 420 """Tests recursive application of Apply. |
| 398 Apply(C). | 421 |
| 422 Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls |
| 423 Apply(C). |
| 424 |
| 425 Args: |
| 426 process_count: Number of processes to use. |
| 427 thread_count: Number of threads to use. |
| 399 """ | 428 """ |
| 400 args = ([3, 1, 4, 1, 5]) | 429 args = ([3, 1, 4, 1, 5]) |
| 401 results = self._RunApply(_ReApplyWithReplicatedArguments, args, | 430 results = self._RunApply(_ReApplyWithReplicatedArguments, args, |
| 402 process_count, thread_count) | 431 process_count, thread_count) |
| 403 self.assertEqual(7 * (sum(args) + len(args)), sum(results)) | 432 self.assertEqual(7 * (sum(args) + len(args)), sum(results)) |
| 404 | 433 |
| 405 def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self): | 434 def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self): |
| 406 self._TestExceptionInProducerRaisesAndTerminates(1, 1) | 435 self._TestExceptionInProducerRaisesAndTerminates(1, 1) |
| 407 | 436 |
| 408 def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self): | 437 def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self): |
| 409 self._TestExceptionInProducerRaisesAndTerminates(1, 10) | 438 self._TestExceptionInProducerRaisesAndTerminates(1, 10) |
| 410 | 439 |
| 411 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 440 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 412 def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self): | 441 def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self): |
| 413 self._TestExceptionInProducerRaisesAndTerminates(10, 1) | 442 self._TestExceptionInProducerRaisesAndTerminates(10, 1) |
| 414 | 443 |
| 415 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | 444 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 416 def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self): | 445 def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self): |
| 417 self._TestExceptionInProducerRaisesAndTerminates(10, 10) | 446 self._TestExceptionInProducerRaisesAndTerminates(10, 10) |
| 418 | 447 |
| 419 @Timeout | 448 @Timeout |
| 420 def _TestExceptionInProducerRaisesAndTerminates(self, process_count, | 449 def _TestExceptionInProducerRaisesAndTerminates(self, process_count, |
| 421 thread_count): | 450 thread_count): |
| 422 args = self # The ProducerThread will try and fail to iterate over this. | 451 args = self # The ProducerThread will try and fail to iterate over this. |
| 423 try: | 452 try: |
| 424 self._RunApply(_ReturnOneValue, args, process_count, thread_count) | 453 self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| 425 self.fail("Did not throw expected exception.") | 454 self.fail('Did not raise expected exception.') |
| 426 except TypeError, e: | 455 except TypeError: |
| 427 pass | 456 pass |
| 428 | 457 |
| 429 def testSkippedArgumentsSingleThreadSingleProcess(self): | 458 def testSkippedArgumentsSingleThreadSingleProcess(self): |
| 430 self._TestSkippedArguments(1, 1) | 459 self._TestSkippedArguments(1, 1) |
| 431 | 460 |
| 432 def testSkippedArgumentsMultiThreadSingleProcess(self): | 461 def testSkippedArgumentsMultiThreadSingleProcess(self): |
| 433 self._TestSkippedArguments(1, 10) | 462 self._TestSkippedArguments(1, 10) |
| 434 | 463 |
| 464 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 435 def testSkippedArgumentsSingleThreadMultiProcess(self): | 465 def testSkippedArgumentsSingleThreadMultiProcess(self): |
| 436 self._TestSkippedArguments(10, 1) | 466 self._TestSkippedArguments(10, 1) |
| 437 | 467 |
| 468 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| 438 def testSkippedArgumentsMultiThreadMultiProcess(self): | 469 def testSkippedArgumentsMultiThreadMultiProcess(self): |
| 439 self._TestSkippedArguments(10, 10) | 470 self._TestSkippedArguments(10, 10) |
| 440 | 471 |
| 441 @Timeout | 472 @Timeout |
| 442 def _TestSkippedArguments(self, process_count, thread_count): | 473 def _TestSkippedArguments(self, process_count, thread_count): |
| 443 | 474 |
| 444 # Skip a proper subset of the arguments. | 475 # Skip a proper subset of the arguments. |
| 445 n = 2 * process_count * thread_count | 476 n = 2 * process_count * thread_count |
| 446 args = range(1, n + 1) | 477 args = range(1, n + 1) |
| 447 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, | 478 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, |
| 448 arg_checker=_SkipEvenNumbersArgChecker) | 479 arg_checker=_SkipEvenNumbersArgChecker) |
| 449 self.assertEqual(n / 2, len(results)) # We know n is even. | 480 self.assertEqual(n / 2, len(results)) # We know n is even. |
| 450 self.assertEqual(n / 2, sum(results)) | 481 self.assertEqual(n / 2, sum(results)) |
| 451 | 482 |
| 452 # Skip all arguments. | 483 # Skip all arguments. |
| 453 args = [2 * x for x in args] | 484 args = [2 * x for x in args] |
| 454 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, | 485 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, |
| 455 arg_checker=_SkipEvenNumbersArgChecker) | 486 arg_checker=_SkipEvenNumbersArgChecker) |
| 456 self.assertEqual(0, len(results)) | 487 self.assertEqual(0, len(results)) |
| 457 | 488 |
| 458 | 489 |
| 459 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework): | 490 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework): |
| 460 """Tests that the parallelism framework works when the multiprocessing module | 491 """Tests parallelism framework works with multiprocessing module unavailable. |
| 461 is not available. Notably, this test has no way to override previous calls | 492 |
| 462 to gslib.util.MultiprocessingIsAvailable to prevent the initialization of | 493 Notably, this test has no way to override previous calls |
| 463 all of the global variables in command.py, so this still behaves slightly | 494 to gslib.util.MultiprocessingIsAvailable to prevent the initialization of |
| 464 differently than the behavior one would see on a machine where the | 495 all of the global variables in command.py, so this still behaves slightly |
| 465 multiprocessing functionality is actually not available (in particular, it | 496 differently than the behavior one would see on a machine where the |
| 466 will not catch the case where a global variable that is not available for | 497 multiprocessing functionality is actually not available (in particular, it |
| 467 the sequential path is referenced before initialization). | 498 will not catch the case where a global variable that is not available for |
| 499 the sequential path is referenced before initialization). |
| 468 """ | 500 """ |
| 469 command_class = FakeCommandWithoutMultiprocessingModule | 501 command_class = FakeCommandWithoutMultiprocessingModule |
| OLD | NEW |