Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(264)

Side by Side Diff: gslib/tests/test_parallelism_framework.py

Issue 698893003: Update checked in version of gsutil to version 4.6 (Closed) Base URL: http://dart.googlecode.com/svn/third_party/gsutil/
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « gslib/tests/test_parallel_cp.py ('k') | gslib/tests/test_perfdiag.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # -*- coding: utf-8 -*-
1 # Copyright 2013 Google Inc. All Rights Reserved. 2 # Copyright 2013 Google Inc. All Rights Reserved.
2 # 3 #
3 # Permission is hereby granted, free of charge, to any person obtaining a 4 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the 5 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including 6 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis- 7 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit 8 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol- 9 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions: 10 # lowing conditions:
10 # 11 #
11 # The above copyright notice and this permission notice shall be included 12 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software. 13 # in all copies or substantial portions of the Software.
13 # 14 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE. 21 # IN THE SOFTWARE.
21
22 """Unit tests for gsutil parallelism framework.""" 22 """Unit tests for gsutil parallelism framework."""
23 23
24 from __future__ import absolute_import
25
24 import functools 26 import functools
25 import gslib.help_provider as help_provider
26 import gslib.tests.testcase as testcase
27 import signal 27 import signal
28 28
29 from boto.storage_uri import BucketStorageUri
30 from gslib import cs_api_map
29 from gslib.command import Command 31 from gslib.command import Command
30 from gslib.command import COMMAND_NAME 32 from gslib.command import CreateGsutilLogger
31 from gslib.command import COMMAND_NAME_ALIASES
32 from gslib.command import DummyArgChecker 33 from gslib.command import DummyArgChecker
33 from gslib.command import CreateGsutilLogger 34 import gslib.tests.testcase as testcase
34 from gslib.commands.lifecycle import LifecycleCommand
35 from gslib.help_provider import HELP_NAME
36 from gslib.help_provider import HELP_NAME_ALIASES
37 from gslib.help_provider import HELP_ONE_LINE_SUMMARY
38 from gslib.help_provider import HELP_TEXT
39 from gslib.help_provider import HelpType
40 from gslib.help_provider import HELP_TYPE
41 from gslib.tests.util import unittest 35 from gslib.tests.util import unittest
42 from gslib.util import IS_WINDOWS 36 from gslib.util import IS_WINDOWS
43 from gslib.util import MultiprocessingIsAvailable 37 from gslib.util import MultiprocessingIsAvailable
44 38
45 39
46 def Timeout(func): 40 def Timeout(func):
47 """Decorator used to provide a timeout for functions.""" 41 """Decorator used to provide a timeout for functions."""
48 @functools.wraps(func) 42 @functools.wraps(func)
49 def wrapper(*args, **kwargs): 43 def Wrapper(*args, **kwargs):
50 if not IS_WINDOWS: 44 if not IS_WINDOWS:
51 signal.signal(signal.SIGALRM, _HandleAlarm) 45 signal.signal(signal.SIGALRM, _HandleAlarm)
52 signal.alarm(5) 46 signal.alarm(5)
53 try: 47 try:
54 func(*args, **kwargs) 48 func(*args, **kwargs)
55 finally: 49 finally:
56 if not IS_WINDOWS: 50 if not IS_WINDOWS:
57 signal.alarm(0) # Cancel the alarm. 51 signal.alarm(0) # Cancel the alarm.
58 return wrapper 52 return Wrapper
59 53
54
55 # pylint: disable=unused-argument
60 def _HandleAlarm(signal_num, cur_stack_frame): 56 def _HandleAlarm(signal_num, cur_stack_frame):
61 raise Exception("Test timed out.") 57 raise Exception('Test timed out.')
58
62 59
63 class CustomException(Exception): 60 class CustomException(Exception):
61
64 def __init__(self, exception_str): 62 def __init__(self, exception_str):
65 super(CustomException, self).__init__(exception_str) 63 super(CustomException, self).__init__(exception_str)
66 64
67 65
68 def _ReturnOneValue(cls, args): 66 def _ReturnOneValue(cls, args, thread_state=None):
69 return 1 67 return 1
70 68
71 def _FailureFunc(cls, args): 69
72 raise CustomException("Failing on purpose.") 70 def _FailureFunc(cls, args, thread_state=None):
71 raise CustomException('Failing on purpose.')
72
73 73
74 def _FailingExceptionHandler(cls, e): 74 def _FailingExceptionHandler(cls, e):
75 cls.failure_count += 1 75 cls.failure_count += 1
76 raise CustomException("Exception handler failing on purpose.") 76 raise CustomException('Exception handler failing on purpose.')
77
77 78
78 def _ExceptionHandler(cls, e): 79 def _ExceptionHandler(cls, e):
79 cls.logger.exception(e) 80 cls.logger.exception(e)
80 cls.failure_count += 1 81 cls.failure_count += 1
81 82
82 def _IncrementByLength(cls, args): 83
84 def _IncrementByLength(cls, args, thread_state=None):
83 cls.arg_length_sum += len(args) 85 cls.arg_length_sum += len(args)
84 86
87
85 def _AdjustProcessCountIfWindows(process_count): 88 def _AdjustProcessCountIfWindows(process_count):
86 if IS_WINDOWS: 89 if IS_WINDOWS:
87 return 1 90 return 1
88 else: 91 else:
89 return process_count 92 return process_count
90 93
91 94
92 def _ReApplyWithReplicatedArguments(cls, args): 95 def _ReApplyWithReplicatedArguments(cls, args, thread_state=None):
96 """Calls Apply with arguments repeated seven times."""
93 new_args = [args] * 7 97 new_args = [args] * 7
94 process_count = _AdjustProcessCountIfWindows(2) 98 process_count = _AdjustProcessCountIfWindows(2)
95 return_values = cls.Apply(_PerformNRecursiveCalls, new_args, 99 return_values = cls.Apply(_PerformNRecursiveCalls, new_args,
96 _ExceptionHandler, arg_checker=DummyArgChecker, 100 _ExceptionHandler, arg_checker=DummyArgChecker,
97 process_count=process_count, thread_count=2, 101 process_count=process_count, thread_count=2,
98 should_return_results=True) 102 should_return_results=True)
99 ret = sum(return_values) 103 ret = sum(return_values)
100 104
101 return_values = cls.Apply(_ReturnOneValue, new_args, 105 return_values = cls.Apply(_ReturnOneValue, new_args,
102 _ExceptionHandler, arg_checker=DummyArgChecker, 106 _ExceptionHandler, arg_checker=DummyArgChecker,
103 process_count=process_count, thread_count=2, 107 process_count=process_count, thread_count=2,
104 should_return_results=True) 108 should_return_results=True)
105 109
106 return len(return_values) + ret 110 return len(return_values) + ret
107 111
108 112
109 def _PerformNRecursiveCalls(cls, args): 113 def _PerformNRecursiveCalls(cls, args, thread_state=None):
110 process_count = _AdjustProcessCountIfWindows(2) 114 process_count = _AdjustProcessCountIfWindows(2)
111 return_values = cls.Apply(_ReturnOneValue, [()] * args, _ExceptionHandler, 115 return_values = cls.Apply(_ReturnOneValue, [()] * args, _ExceptionHandler,
112 arg_checker=DummyArgChecker, 116 arg_checker=DummyArgChecker,
113 process_count=process_count, thread_count=2, 117 process_count=process_count, thread_count=2,
114 should_return_results=True) 118 should_return_results=True)
115 return len(return_values) 119 return len(return_values)
116 120
117 121
118 def _SkipEvenNumbersArgChecker(cls, arg): 122 def _SkipEvenNumbersArgChecker(cls, arg):
119 return arg % 2 != 0 123 return arg % 2 != 0
120 124
121 125
122 class FailingIterator(object): 126 class FailingIterator(object):
127
123 def __init__(self, size, failure_indices): 128 def __init__(self, size, failure_indices):
124 self.size = size 129 self.size = size
125 self.failure_indices = failure_indices 130 self.failure_indices = failure_indices
126 self.current_index = 0 131 self.current_index = 0
127 132
128 def __iter__(self): 133 def __iter__(self):
129 return self 134 return self
130 135
131 def next(self): 136 def next(self):
132 if self.current_index == self.size: 137 if self.current_index == self.size:
133 raise StopIteration('') 138 raise StopIteration('')
134 elif self.current_index in self.failure_indices: 139 elif self.current_index in self.failure_indices:
135 self.current_index += 1 140 self.current_index += 1
136 raise CustomException( 141 raise CustomException(
137 'Iterator failing on purpose at index %d.' % self.current_index) 142 'Iterator failing on purpose at index %d.' % self.current_index)
138 else: 143 else:
139 self.current_index += 1 144 self.current_index += 1
140 return self.current_index - 1 145 return self.current_index - 1
141 146
142 147
143 class FakeCommand(Command): 148 class FakeCommand(Command):
144 command_spec = { 149 """Fake command class for overriding command instance state."""
145 COMMAND_NAME : 'fake', 150 command_spec = Command.CreateCommandSpec(
146 COMMAND_NAME_ALIASES : [], 151 'fake',
147 } 152 command_name_aliases=[],
148 help_spec = { 153 )
149 HELP_NAME : 'fake', 154 # Help specification. See help_provider.py for documentation.
150 HELP_NAME_ALIASES : [], 155 help_spec = Command.HelpSpec(
151 HELP_TYPE : HelpType.COMMAND_HELP, 156 help_name='fake',
152 HELP_ONE_LINE_SUMMARY : 'Something to take up space.', 157 help_name_aliases=[],
153 HELP_TEXT : 'Something else to take up space.', 158 help_type='command_help',
154 } 159 help_one_line_summary='Something to take up space.',
160 help_text='Something else to take up space.',
161 subcommand_help_text={},
162 )
155 163
156 def __init__(self, do_parallel): 164 def __init__(self, do_parallel):
165 self.bucket_storage_uri_class = BucketStorageUri
166 support_map = {
167 'gs': ['JSON'],
168 's3': ['XML']
169 }
170 default_map = {
171 'gs': 'JSON',
172 's3': 'XML'
173 }
174 self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap(
175 cs_api_map.GsutilApiClassMapFactory, support_map, default_map)
157 self.logger = CreateGsutilLogger('FakeCommand') 176 self.logger = CreateGsutilLogger('FakeCommand')
158 self.parallel_operations = do_parallel 177 self.parallel_operations = do_parallel
159 self.failure_count = 0 178 self.failure_count = 0
160 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] 179 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
180 self.debug = 0
161 181
162 182
163 class FakeCommandWithoutMultiprocessingModule(FakeCommand): 183 class FakeCommandWithoutMultiprocessingModule(FakeCommand):
184
164 def __init__(self, do_parallel): 185 def __init__(self, do_parallel):
165 super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel) 186 super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel)
166 self.multiprocessing_is_available = False 187 self.multiprocessing_is_available = False
167 188
168 189
169 # TODO: Figure out a good way to test that ctrl+C really stops execution, 190 # TODO: Figure out a good way to test that ctrl+C really stops execution,
170 # and also that ctrl+C works when there are still tasks enqueued. 191 # and also that ctrl+C works when there are still tasks enqueued.
171 class TestParallelismFramework(testcase.GsUtilUnitTestCase): 192 class TestParallelismFramework(testcase.GsUtilUnitTestCase):
172 """gsutil parallelism framework test suite.""" 193 """gsutil parallelism framework test suite."""
173 194
174 command_class = FakeCommand 195 command_class = FakeCommand
175 196
176 def _RunApply(self, func, args_iterator, process_count, thread_count, 197 def _RunApply(self, func, args_iterator, process_count, thread_count,
177 command_inst=None, shared_attrs=None, fail_on_error=False, 198 command_inst=None, shared_attrs=None, fail_on_error=False,
178 thr_exc_handler=None, arg_checker=DummyArgChecker): 199 thr_exc_handler=None, arg_checker=DummyArgChecker):
179 command_inst = command_inst or self.command_class(True) 200 command_inst = command_inst or self.command_class(True)
180 exception_handler = thr_exc_handler or _ExceptionHandler 201 exception_handler = thr_exc_handler or _ExceptionHandler
202
181 return command_inst.Apply(func, args_iterator, exception_handler, 203 return command_inst.Apply(func, args_iterator, exception_handler,
182 thread_count=thread_count, 204 thread_count=thread_count,
183 process_count=process_count, 205 process_count=process_count,
184 arg_checker=arg_checker, 206 arg_checker=arg_checker,
185 should_return_results=True, 207 should_return_results=True,
186 shared_attrs=shared_attrs, 208 shared_attrs=shared_attrs,
187 fail_on_error=fail_on_error) 209 fail_on_error=fail_on_error)
188 210
189 def testBasicApplySingleProcessSingleThread(self): 211 def testBasicApplySingleProcessSingleThread(self):
190 self._TestBasicApply(1, 1) 212 self._TestBasicApply(1, 1)
(...skipping 25 matching lines...) Expand all
216 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 238 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
217 def testIteratorFailureMultiProcessSingleThread(self): 239 def testIteratorFailureMultiProcessSingleThread(self):
218 self._TestIteratorFailure(10, 1) 240 self._TestIteratorFailure(10, 1)
219 241
220 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 242 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
221 def testIteratorFailureMultiProcessMultiThread(self): 243 def testIteratorFailureMultiProcessMultiThread(self):
222 self._TestIteratorFailure(10, 10) 244 self._TestIteratorFailure(10, 10)
223 245
224 @Timeout 246 @Timeout
225 def _TestIteratorFailure(self, process_count, thread_count): 247 def _TestIteratorFailure(self, process_count, thread_count):
248 """Tests apply with a failing iterator."""
226 # Tests for fail_on_error == False. 249 # Tests for fail_on_error == False.
227 250
228 args = FailingIterator(10, [0]) 251 args = FailingIterator(10, [0])
229 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) 252 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
230 self.assertEqual(9, len(results)) 253 self.assertEqual(9, len(results))
231 254
232 args = FailingIterator(10, [5]) 255 args = FailingIterator(10, [5])
233 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) 256 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
234 self.assertEqual(9, len(results)) 257 self.assertEqual(9, len(results))
235 258
236 args = FailingIterator(10, [9]) 259 args = FailingIterator(10, [9])
237 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) 260 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
238 self.assertEqual(9, len(results)) 261 self.assertEqual(9, len(results))
239 262
240 if process_count * thread_count > 1: 263 if process_count * thread_count > 1:
241 # In this case, we should ignore the fail_on_error flag. 264 # In this case, we should ignore the fail_on_error flag.
242 args = FailingIterator(10, [9]) 265 args = FailingIterator(10, [9])
243 results = self._RunApply(_ReturnOneValue, args, process_count, 266 results = self._RunApply(_ReturnOneValue, args, process_count,
244 thread_count, fail_on_error=True) 267 thread_count, fail_on_error=True)
245 self.assertEqual(9, len(results)) 268 self.assertEqual(9, len(results))
246 269
247 args = FailingIterator(10, range(10)) 270 args = FailingIterator(10, range(10))
248 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) 271 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
249 self.assertEqual(0, len(results)) 272 self.assertEqual(0, len(results))
250 273
251 args = FailingIterator(0, []) 274 args = FailingIterator(0, [])
252 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) 275 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
253 self.assertEqual(0, len(results)) 276 self.assertEqual(0, len(results))
254 277
255 def testTestSharedAttrsWorkSingleProcessSingleThread(self): 278 def testTestSharedAttrsWorkSingleProcessSingleThread(self):
256 self._TestSharedAttrsWork(1, 1) 279 self._TestSharedAttrsWork(1, 1)
257 280
258 def testTestSharedAttrsWorkSingleProcessMultiThread(self): 281 def testTestSharedAttrsWorkSingleProcessMultiThread(self):
259 self._TestSharedAttrsWork(1, 10) 282 self._TestSharedAttrsWork(1, 10)
260 283
261 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 284 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
262 def testTestSharedAttrsWorkMultiProcessSingleThread(self): 285 def testTestSharedAttrsWorkMultiProcessSingleThread(self):
263 self._TestSharedAttrsWork(10, 1) 286 self._TestSharedAttrsWork(10, 1)
264 287
265 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 288 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
266 def testTestSharedAttrsWorkMultiProcessMultiThread(self): 289 def testTestSharedAttrsWorkMultiProcessMultiThread(self):
267 self._TestSharedAttrsWork(10, 10) 290 self._TestSharedAttrsWork(10, 10)
268 291
269 @Timeout 292 @Timeout
270 def _TestSharedAttrsWork(self, process_count, thread_count): 293 def _TestSharedAttrsWork(self, process_count, thread_count):
294 """Tests that Apply successfully uses shared_attrs."""
271 command_inst = self.command_class(True) 295 command_inst = self.command_class(True)
272 command_inst.arg_length_sum = 19 296 command_inst.arg_length_sum = 19
273 args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd'] 297 args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd']
274 self._RunApply(_IncrementByLength, args, process_count, 298 self._RunApply(_IncrementByLength, args, process_count,
275 thread_count, command_inst=command_inst, 299 thread_count, command_inst=command_inst,
276 shared_attrs=['arg_length_sum']) 300 shared_attrs=['arg_length_sum'])
277 expected_sum = 19 301 expected_sum = 19
278 for arg in args: 302 for arg in args:
279 expected_sum += len(arg) 303 expected_sum += len(arg)
280 self.assertEqual(expected_sum, command_inst.arg_length_sum) 304 self.assertEqual(expected_sum, command_inst.arg_length_sum)
281 305
282 # Test that shared variables work when the iterator fails. 306 # Test that shared variables work when the iterator fails.
283 command_inst = self.command_class(True) 307 command_inst = self.command_class(True)
284 args = FailingIterator(10, [1, 3, 5]) 308 args = FailingIterator(10, [1, 3, 5])
285 self._RunApply(_ReturnOneValue, args, process_count, thread_count, 309 self._RunApply(_ReturnOneValue, args, process_count, thread_count,
286 command_inst=command_inst, shared_attrs=['failure_count']) 310 command_inst=command_inst, shared_attrs=['failure_count'])
287 self.assertEqual(3, command_inst.failure_count) 311 self.assertEqual(3, command_inst.failure_count)
288 312
289 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self): 313 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
290 self._TestThreadsSurviveExceptionsInFunc(1, 1) 314 self._TestThreadsSurviveExceptionsInFunc(1, 1)
291 315
292 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self): 316 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self):
293 self._TestThreadsSurviveExceptionsInFunc(1, 10) 317 self._TestThreadsSurviveExceptionsInFunc(1, 10)
294 318
295 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 319 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
296 def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self): 320 def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self):
297 self._TestThreadsSurviveExceptionsInFunc(10, 1) 321 self._TestThreadsSurviveExceptionsInFunc(10, 1)
298 322
(...skipping 28 matching lines...) Expand all
327 def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count): 351 def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count):
328 command_inst = self.command_class(True) 352 command_inst = self.command_class(True)
329 args = ([()] * 5) 353 args = ([()] * 5)
330 self._RunApply(_FailureFunc, args, process_count, thread_count, 354 self._RunApply(_FailureFunc, args, process_count, thread_count,
331 command_inst=command_inst, shared_attrs=['failure_count'], 355 command_inst=command_inst, shared_attrs=['failure_count'],
332 thr_exc_handler=_FailingExceptionHandler) 356 thr_exc_handler=_FailingExceptionHandler)
333 self.assertEqual(len(args), command_inst.failure_count) 357 self.assertEqual(len(args), command_inst.failure_count)
334 358
335 @Timeout 359 @Timeout
336 def testFailOnErrorFlag(self): 360 def testFailOnErrorFlag(self):
361 """Tests that fail_on_error produces the correct exception on failure."""
337 def _ExpectCustomException(test_func): 362 def _ExpectCustomException(test_func):
338 try: 363 try:
339 test_func() 364 test_func()
340 self.fail( 365 self.fail(
341 'Setting fail_on_error should raise any exception encountered.') 366 'Setting fail_on_error should raise any exception encountered.')
342 except CustomException, e: 367 except CustomException, e:
343 pass 368 pass
344 except Exception, e: 369 except Exception, e:
345 self.fail("Got unexpected error: " + str(e)) 370 self.fail('Got unexpected error: ' + str(e))
346 371
347 def _RunFailureFunc(): 372 def _RunFailureFunc():
348 command_inst = self.command_class(True) 373 command_inst = self.command_class(True)
349 args = ([()] * 5) 374 args = ([()] * 5)
350 results = self._RunApply(_FailureFunc, args, 1, 1, 375 self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst,
351 command_inst=command_inst, 376 shared_attrs=['failure_count'], fail_on_error=True)
352 shared_attrs=['failure_count'],
353 fail_on_error=True)
354 _ExpectCustomException(_RunFailureFunc) 377 _ExpectCustomException(_RunFailureFunc)
355 378
356 def _RunFailingIteratorFirstPosition(): 379 def _RunFailingIteratorFirstPosition():
357 args = FailingIterator(10, [0]) 380 args = FailingIterator(10, [0])
358 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) 381 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
359 self.assertEqual(0, len(results)) 382 self.assertEqual(0, len(results))
360 _ExpectCustomException(_RunFailingIteratorFirstPosition) 383 _ExpectCustomException(_RunFailingIteratorFirstPosition)
361 384
362 def _RunFailingIteratorPositionMiddlePosition(): 385 def _RunFailingIteratorPositionMiddlePosition():
363 args = FailingIterator(10, [5]) 386 args = FailingIterator(10, [5])
(...skipping 19 matching lines...) Expand all
383 def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self): 406 def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self):
384 self._TestRecursiveDepthThreeDifferentFunctions(1, 10) 407 self._TestRecursiveDepthThreeDifferentFunctions(1, 10)
385 408
386 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 409 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
387 def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self): 410 def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self):
388 self._TestRecursiveDepthThreeDifferentFunctions(10, 1) 411 self._TestRecursiveDepthThreeDifferentFunctions(10, 1)
389 412
390 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 413 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
391 def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self): 414 def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self):
392 self._TestRecursiveDepthThreeDifferentFunctions(10, 10) 415 self._TestRecursiveDepthThreeDifferentFunctions(10, 10)
393 416
394 @Timeout 417 @Timeout
395 def _TestRecursiveDepthThreeDifferentFunctions(self, process_count, 418 def _TestRecursiveDepthThreeDifferentFunctions(self, process_count,
396 thread_count): 419 thread_count):
397 """Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls 420 """Tests recursive application of Apply.
398 Apply(C). 421
422 Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
423 Apply(C).
424
425 Args:
426 process_count: Number of processes to use.
427 thread_count: Number of threads to use.
399 """ 428 """
400 args = ([3, 1, 4, 1, 5]) 429 args = ([3, 1, 4, 1, 5])
401 results = self._RunApply(_ReApplyWithReplicatedArguments, args, 430 results = self._RunApply(_ReApplyWithReplicatedArguments, args,
402 process_count, thread_count) 431 process_count, thread_count)
403 self.assertEqual(7 * (sum(args) + len(args)), sum(results)) 432 self.assertEqual(7 * (sum(args) + len(args)), sum(results))
404 433
405 def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self): 434 def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self):
406 self._TestExceptionInProducerRaisesAndTerminates(1, 1) 435 self._TestExceptionInProducerRaisesAndTerminates(1, 1)
407 436
408 def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self): 437 def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self):
409 self._TestExceptionInProducerRaisesAndTerminates(1, 10) 438 self._TestExceptionInProducerRaisesAndTerminates(1, 10)
410 439
411 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 440 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
412 def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self): 441 def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self):
413 self._TestExceptionInProducerRaisesAndTerminates(10, 1) 442 self._TestExceptionInProducerRaisesAndTerminates(10, 1)
414 443
415 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 444 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
416 def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self): 445 def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self):
417 self._TestExceptionInProducerRaisesAndTerminates(10, 10) 446 self._TestExceptionInProducerRaisesAndTerminates(10, 10)
418 447
419 @Timeout 448 @Timeout
420 def _TestExceptionInProducerRaisesAndTerminates(self, process_count, 449 def _TestExceptionInProducerRaisesAndTerminates(self, process_count,
421 thread_count): 450 thread_count):
422 args = self # The ProducerThread will try and fail to iterate over this. 451 args = self # The ProducerThread will try and fail to iterate over this.
423 try: 452 try:
424 self._RunApply(_ReturnOneValue, args, process_count, thread_count) 453 self._RunApply(_ReturnOneValue, args, process_count, thread_count)
425 self.fail("Did not throw expected exception.") 454 self.fail('Did not raise expected exception.')
426 except TypeError, e: 455 except TypeError:
427 pass 456 pass
428 457
429 def testSkippedArgumentsSingleThreadSingleProcess(self): 458 def testSkippedArgumentsSingleThreadSingleProcess(self):
430 self._TestSkippedArguments(1, 1) 459 self._TestSkippedArguments(1, 1)
431 460
432 def testSkippedArgumentsMultiThreadSingleProcess(self): 461 def testSkippedArgumentsMultiThreadSingleProcess(self):
433 self._TestSkippedArguments(1, 10) 462 self._TestSkippedArguments(1, 10)
434 463
464 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
435 def testSkippedArgumentsSingleThreadMultiProcess(self): 465 def testSkippedArgumentsSingleThreadMultiProcess(self):
436 self._TestSkippedArguments(10, 1) 466 self._TestSkippedArguments(10, 1)
437 467
468 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
438 def testSkippedArgumentsMultiThreadMultiProcess(self): 469 def testSkippedArgumentsMultiThreadMultiProcess(self):
439 self._TestSkippedArguments(10, 10) 470 self._TestSkippedArguments(10, 10)
440 471
441 @Timeout 472 @Timeout
442 def _TestSkippedArguments(self, process_count, thread_count): 473 def _TestSkippedArguments(self, process_count, thread_count):
443 474
444 # Skip a proper subset of the arguments. 475 # Skip a proper subset of the arguments.
445 n = 2 * process_count * thread_count 476 n = 2 * process_count * thread_count
446 args = range(1, n + 1) 477 args = range(1, n + 1)
447 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, 478 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
448 arg_checker=_SkipEvenNumbersArgChecker) 479 arg_checker=_SkipEvenNumbersArgChecker)
449 self.assertEqual(n / 2, len(results)) # We know n is even. 480 self.assertEqual(n / 2, len(results)) # We know n is even.
450 self.assertEqual(n / 2, sum(results)) 481 self.assertEqual(n / 2, sum(results))
451 482
452 # Skip all arguments. 483 # Skip all arguments.
453 args = [2 * x for x in args] 484 args = [2 * x for x in args]
454 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, 485 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
455 arg_checker=_SkipEvenNumbersArgChecker) 486 arg_checker=_SkipEvenNumbersArgChecker)
456 self.assertEqual(0, len(results)) 487 self.assertEqual(0, len(results))
457 488
458 489
459 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework): 490 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
460 """Tests that the parallelism framework works when the multiprocessing module 491 """Tests parallelism framework works with multiprocessing module unavailable.
461 is not available. Notably, this test has no way to override previous calls 492
462 to gslib.util.MultiprocessingIsAvailable to prevent the initialization of 493 Notably, this test has no way to override previous calls
463 all of the global variables in command.py, so this still behaves slightly 494 to gslib.util.MultiprocessingIsAvailable to prevent the initialization of
464 differently than the behavior one would see on a machine where the 495 all of the global variables in command.py, so this still behaves slightly
465 multiprocessing functionality is actually not available (in particular, it 496 differently than the behavior one would see on a machine where the
466 will not catch the case where a global variable that is not available for 497 multiprocessing functionality is actually not available (in particular, it
467 the sequential path is referenced before initialization). 498 will not catch the case where a global variable that is not available for
499 the sequential path is referenced before initialization).
468 """ 500 """
469 command_class = FakeCommandWithoutMultiprocessingModule 501 command_class = FakeCommandWithoutMultiprocessingModule
OLDNEW
« no previous file with comments | « gslib/tests/test_parallel_cp.py ('k') | gslib/tests/test_perfdiag.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698