Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(20)

Side by Side Diff: third_party/gsutil/gslib/tests/test_parallelism_framework.py

Issue 1377933002: [catapult] - Copy Telemetry's gsutilz over to third_party. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: Rename to gsutil. Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining a
5 # copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish, dis-
8 # tribute, sublicense, and/or sell copies of the Software, and to permit
9 # persons to whom the Software is furnished to do so, subject to the fol-
10 # lowing conditions:
11 #
12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 # IN THE SOFTWARE.
22 """Unit tests for gsutil parallelism framework."""
23
24 from __future__ import absolute_import
25
26 import functools
27 import signal
28
29 from boto.storage_uri import BucketStorageUri
30 from gslib import cs_api_map
31 from gslib.command import Command
32 from gslib.command import CreateGsutilLogger
33 from gslib.command import DummyArgChecker
34 import gslib.tests.testcase as testcase
35 from gslib.tests.testcase.base import RequiresIsolation
36 from gslib.tests.util import unittest
37 from gslib.util import IS_WINDOWS
38 from gslib.util import MultiprocessingIsAvailable
39
40
41 # Amount of time for an individual test to run before timing out. We need a
42 # reasonably high value since if many tests are running in parallel, an
43 # individual test may take a while to complete.
44 _TEST_TIMEOUT_SECONDS = 120
45
46
47 def Timeout(func):
48 """Decorator used to provide a timeout for functions."""
49 @functools.wraps(func)
50 def Wrapper(*args, **kwargs):
51 if not IS_WINDOWS:
52 signal.signal(signal.SIGALRM, _HandleAlarm)
53 signal.alarm(_TEST_TIMEOUT_SECONDS)
54 try:
55 func(*args, **kwargs)
56 finally:
57 if not IS_WINDOWS:
58 signal.alarm(0) # Cancel the alarm.
59 return Wrapper
60
61
62 # pylint: disable=unused-argument
63 def _HandleAlarm(signal_num, cur_stack_frame):
64 raise Exception('Test timed out.')
65
66
67 class CustomException(Exception):
68
69 def __init__(self, exception_str):
70 super(CustomException, self).__init__(exception_str)
71
72
73 def _ReturnOneValue(cls, args, thread_state=None):
74 return 1
75
76
77 def _FailureFunc(cls, args, thread_state=None):
78 raise CustomException('Failing on purpose.')
79
80
81 def _FailingExceptionHandler(cls, e):
82 cls.failure_count += 1
83 raise CustomException('Exception handler failing on purpose.')
84
85
86 def _ExceptionHandler(cls, e):
87 cls.logger.exception(e)
88 cls.failure_count += 1
89
90
91 def _IncrementByLength(cls, args, thread_state=None):
92 cls.arg_length_sum += len(args)
93
94
95 def _AdjustProcessCountIfWindows(process_count):
96 if IS_WINDOWS:
97 return 1
98 else:
99 return process_count
100
101
102 def _ReApplyWithReplicatedArguments(cls, args, thread_state=None):
103 """Calls Apply with arguments repeated seven times.
104
105 The first two elements of args should be the process and thread counts,
106 respectively, to be used for the recursive calls.
107
108 Args:
109 cls: The Command class to call Apply on.
110 args: Arguments to pass to Apply.
111 thread_state: Unused, required by function signature.
112
113 Returns:
114 Number of values returned by the two calls to Apply.
115 """
116 new_args = [args] * 7
117 process_count = _AdjustProcessCountIfWindows(args[0])
118 thread_count = args[1]
119 return_values = cls.Apply(_PerformNRecursiveCalls, new_args,
120 _ExceptionHandler, arg_checker=DummyArgChecker,
121 process_count=process_count,
122 thread_count=thread_count,
123 should_return_results=True)
124 ret = sum(return_values)
125
126 return_values = cls.Apply(_ReturnOneValue, new_args,
127 _ExceptionHandler, arg_checker=DummyArgChecker,
128 process_count=process_count,
129 thread_count=thread_count,
130 should_return_results=True)
131
132 return len(return_values) + ret
133
134
135 def _PerformNRecursiveCalls(cls, args, thread_state=None):
136 """Calls Apply to perform N recursive calls.
137
138 The first two elements of args should be the process and thread counts,
139 respectively, to be used for the recursive calls, while N is the third element
140 (the number of recursive calls to make).
141
142 Args:
143 cls: The Command class to call Apply on.
144 args: Arguments to pass to Apply.
145 thread_state: Unused, required by function signature.
146
147 Returns:
148 Number of values returned by the call to Apply.
149 """
150 process_count = _AdjustProcessCountIfWindows(args[0])
151 thread_count = args[1]
152 return_values = cls.Apply(_ReturnOneValue, [()] * args[2], _ExceptionHandler,
153 arg_checker=DummyArgChecker,
154 process_count=process_count,
155 thread_count=thread_count,
156 should_return_results=True)
157 return len(return_values)
158
159
160 def _SkipEvenNumbersArgChecker(cls, arg):
161 return arg % 2 != 0
162
163
164 class FailingIterator(object):
165
166 def __init__(self, size, failure_indices):
167 self.size = size
168 self.failure_indices = failure_indices
169 self.current_index = 0
170
171 def __iter__(self):
172 return self
173
174 def next(self):
175 if self.current_index == self.size:
176 raise StopIteration('')
177 elif self.current_index in self.failure_indices:
178 self.current_index += 1
179 raise CustomException(
180 'Iterator failing on purpose at index %d.' % self.current_index)
181 else:
182 self.current_index += 1
183 return self.current_index - 1
184
185
186 class FakeCommand(Command):
187 """Fake command class for overriding command instance state."""
188 command_spec = Command.CreateCommandSpec(
189 'fake',
190 command_name_aliases=[],
191 )
192 # Help specification. See help_provider.py for documentation.
193 help_spec = Command.HelpSpec(
194 help_name='fake',
195 help_name_aliases=[],
196 help_type='command_help',
197 help_one_line_summary='Something to take up space.',
198 help_text='Something else to take up space.',
199 subcommand_help_text={},
200 )
201
202 def __init__(self, do_parallel):
203 self.bucket_storage_uri_class = BucketStorageUri
204 support_map = {
205 'gs': ['JSON'],
206 's3': ['XML']
207 }
208 default_map = {
209 'gs': 'JSON',
210 's3': 'XML'
211 }
212 self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap(
213 cs_api_map.GsutilApiClassMapFactory, support_map, default_map)
214 self.logger = CreateGsutilLogger('FakeCommand')
215 self.parallel_operations = do_parallel
216 self.failure_count = 0
217 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
218 self.debug = 0
219
220
221 class FakeCommandWithoutMultiprocessingModule(FakeCommand):
222
223 def __init__(self, do_parallel):
224 super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel)
225 self.multiprocessing_is_available = False
226
227
228 # TODO: Figure out a good way to test that ctrl+C really stops execution,
229 # and also that ctrl+C works when there are still tasks enqueued.
230 class TestParallelismFramework(testcase.GsUtilUnitTestCase):
231 """gsutil parallelism framework test suite."""
232
233 command_class = FakeCommand
234
235 def _RunApply(self, func, args_iterator, process_count, thread_count,
236 command_inst=None, shared_attrs=None, fail_on_error=False,
237 thr_exc_handler=None, arg_checker=DummyArgChecker):
238 command_inst = command_inst or self.command_class(True)
239 exception_handler = thr_exc_handler or _ExceptionHandler
240
241 return command_inst.Apply(func, args_iterator, exception_handler,
242 thread_count=thread_count,
243 process_count=process_count,
244 arg_checker=arg_checker,
245 should_return_results=True,
246 shared_attrs=shared_attrs,
247 fail_on_error=fail_on_error)
248
249 @RequiresIsolation
250 def testBasicApplySingleProcessSingleThread(self):
251 self._TestBasicApply(1, 1)
252
253 @RequiresIsolation
254 def testBasicApplySingleProcessMultiThread(self):
255 self._TestBasicApply(1, 3)
256
257 @RequiresIsolation
258 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
259 def testBasicApplyMultiProcessSingleThread(self):
260 self._TestBasicApply(3, 1)
261
262 @RequiresIsolation
263 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
264 def testBasicApplyMultiProcessMultiThread(self):
265 self._TestBasicApply(3, 3)
266
267 @Timeout
268 def _TestBasicApply(self, process_count, thread_count):
269 args = [()] * (17 * process_count * thread_count + 1)
270
271 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
272 self.assertEqual(len(args), len(results))
273
274 @RequiresIsolation
275 def testIteratorFailureSingleProcessSingleThread(self):
276 self._TestIteratorFailure(1, 1)
277
278 @RequiresIsolation
279 def testIteratorFailureSingleProcessMultiThread(self):
280 self._TestIteratorFailure(1, 3)
281
282 @RequiresIsolation
283 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
284 def testIteratorFailureMultiProcessSingleThread(self):
285 self._TestIteratorFailure(3, 1)
286
287 @RequiresIsolation
288 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
289 def testIteratorFailureMultiProcessMultiThread(self):
290 self._TestIteratorFailure(3, 3)
291
292 @Timeout
293 def _TestIteratorFailure(self, process_count, thread_count):
294 """Tests apply with a failing iterator."""
295 # Tests for fail_on_error == False.
296
297 args = FailingIterator(10, [0])
298 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
299 self.assertEqual(9, len(results))
300
301 args = FailingIterator(10, [5])
302 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
303 self.assertEqual(9, len(results))
304
305 args = FailingIterator(10, [9])
306 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
307 self.assertEqual(9, len(results))
308
309 if process_count * thread_count > 1:
310 # In this case, we should ignore the fail_on_error flag.
311 args = FailingIterator(10, [9])
312 results = self._RunApply(_ReturnOneValue, args, process_count,
313 thread_count, fail_on_error=True)
314 self.assertEqual(9, len(results))
315
316 args = FailingIterator(10, range(10))
317 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
318 self.assertEqual(0, len(results))
319
320 args = FailingIterator(0, [])
321 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
322 self.assertEqual(0, len(results))
323
324 @RequiresIsolation
325 def testTestSharedAttrsWorkSingleProcessSingleThread(self):
326 self._TestSharedAttrsWork(1, 1)
327
328 @RequiresIsolation
329 def testTestSharedAttrsWorkSingleProcessMultiThread(self):
330 self._TestSharedAttrsWork(1, 3)
331
332 @RequiresIsolation
333 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
334 def testTestSharedAttrsWorkMultiProcessSingleThread(self):
335 self._TestSharedAttrsWork(3, 1)
336
337 @RequiresIsolation
338 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
339 def testTestSharedAttrsWorkMultiProcessMultiThread(self):
340 self._TestSharedAttrsWork(3, 3)
341
342 @Timeout
343 def _TestSharedAttrsWork(self, process_count, thread_count):
344 """Tests that Apply successfully uses shared_attrs."""
345 command_inst = self.command_class(True)
346 command_inst.arg_length_sum = 19
347 args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd']
348 self._RunApply(_IncrementByLength, args, process_count,
349 thread_count, command_inst=command_inst,
350 shared_attrs=['arg_length_sum'])
351 expected_sum = 19
352 for arg in args:
353 expected_sum += len(arg)
354 self.assertEqual(expected_sum, command_inst.arg_length_sum)
355
356 # Test that shared variables work when the iterator fails.
357 command_inst = self.command_class(True)
358 args = FailingIterator(10, [1, 3, 5])
359 self._RunApply(_ReturnOneValue, args, process_count, thread_count,
360 command_inst=command_inst, shared_attrs=['failure_count'])
361 self.assertEqual(3, command_inst.failure_count)
362
363 @RequiresIsolation
364 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
365 self._TestThreadsSurviveExceptionsInFunc(1, 1)
366
367 @RequiresIsolation
368 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self):
369 self._TestThreadsSurviveExceptionsInFunc(1, 3)
370
371 @RequiresIsolation
372 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
373 def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self):
374 self._TestThreadsSurviveExceptionsInFunc(3, 1)
375
376 @RequiresIsolation
377 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
378 def testThreadsSurviveExceptionsInFuncMultiProcessMultiThread(self):
379 self._TestThreadsSurviveExceptionsInFunc(3, 3)
380
381 @Timeout
382 def _TestThreadsSurviveExceptionsInFunc(self, process_count, thread_count):
383 command_inst = self.command_class(True)
384 args = ([()] * 5)
385 self._RunApply(_FailureFunc, args, process_count, thread_count,
386 command_inst=command_inst, shared_attrs=['failure_count'],
387 thr_exc_handler=_FailingExceptionHandler)
388 self.assertEqual(len(args), command_inst.failure_count)
389
390 @RequiresIsolation
391 def testThreadsSurviveExceptionsInHandlerSingleProcessSingleThread(self):
392 self._TestThreadsSurviveExceptionsInHandler(1, 1)
393
394 @RequiresIsolation
395 def testThreadsSurviveExceptionsInHandlerSingleProcessMultiThread(self):
396 self._TestThreadsSurviveExceptionsInHandler(1, 3)
397
398 @RequiresIsolation
399 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
400 def testThreadsSurviveExceptionsInHandlerMultiProcessSingleThread(self):
401 self._TestThreadsSurviveExceptionsInHandler(3, 1)
402
403 @RequiresIsolation
404 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
405 def testThreadsSurviveExceptionsInHandlerMultiProcessMultiThread(self):
406 self._TestThreadsSurviveExceptionsInHandler(3, 3)
407
408 @Timeout
409 def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count):
410 command_inst = self.command_class(True)
411 args = ([()] * 5)
412 self._RunApply(_FailureFunc, args, process_count, thread_count,
413 command_inst=command_inst, shared_attrs=['failure_count'],
414 thr_exc_handler=_FailingExceptionHandler)
415 self.assertEqual(len(args), command_inst.failure_count)
416
417 @RequiresIsolation
418 @Timeout
419 def testFailOnErrorFlag(self):
420 """Tests that fail_on_error produces the correct exception on failure."""
421 def _ExpectCustomException(test_func):
422 try:
423 test_func()
424 self.fail(
425 'Setting fail_on_error should raise any exception encountered.')
426 except CustomException, e:
427 pass
428 except Exception, e:
429 self.fail('Got unexpected error: ' + str(e))
430
431 def _RunFailureFunc():
432 command_inst = self.command_class(True)
433 args = ([()] * 5)
434 self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst,
435 shared_attrs=['failure_count'], fail_on_error=True)
436 _ExpectCustomException(_RunFailureFunc)
437
438 def _RunFailingIteratorFirstPosition():
439 args = FailingIterator(10, [0])
440 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
441 self.assertEqual(0, len(results))
442 _ExpectCustomException(_RunFailingIteratorFirstPosition)
443
444 def _RunFailingIteratorPositionMiddlePosition():
445 args = FailingIterator(10, [5])
446 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
447 self.assertEqual(5, len(results))
448 _ExpectCustomException(_RunFailingIteratorPositionMiddlePosition)
449
450 def _RunFailingIteratorLastPosition():
451 args = FailingIterator(10, [9])
452 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
453 self.assertEqual(9, len(results))
454 _ExpectCustomException(_RunFailingIteratorLastPosition)
455
456 def _RunFailingIteratorMultiplePositions():
457 args = FailingIterator(10, [1, 3, 5])
458 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
459 self.assertEqual(1, len(results))
460 _ExpectCustomException(_RunFailingIteratorMultiplePositions)
461
462 @RequiresIsolation
463 def testRecursiveDepthThreeDifferentFunctionsSingleProcessSingleThread(self):
464 self._TestRecursiveDepthThreeDifferentFunctions(1, 1)
465
466 @RequiresIsolation
467 def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self):
468 self._TestRecursiveDepthThreeDifferentFunctions(1, 3)
469
470 @RequiresIsolation
471 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
472 def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self):
473 self._TestRecursiveDepthThreeDifferentFunctions(3, 1)
474
475 @RequiresIsolation
476 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
477 def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self):
478 self._TestRecursiveDepthThreeDifferentFunctions(3, 3)
479
480 @Timeout
481 def _TestRecursiveDepthThreeDifferentFunctions(self, process_count,
482 thread_count):
483 """Tests recursive application of Apply.
484
485 Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
486 Apply(C).
487
488 Args:
489 process_count: Number of processes to use.
490 thread_count: Number of threads to use.
491 """
492 base_args = [3, 1, 4, 1, 5]
493 args = [[process_count, thread_count, count] for count in base_args]
494
495 results = self._RunApply(_ReApplyWithReplicatedArguments, args,
496 process_count, thread_count)
497 self.assertEqual(7 * (sum(base_args) + len(base_args)), sum(results))
498
499 @RequiresIsolation
500 def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self):
501 self._TestExceptionInProducerRaisesAndTerminates(1, 1)
502
503 @RequiresIsolation
504 def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self):
505 self._TestExceptionInProducerRaisesAndTerminates(1, 3)
506
507 @RequiresIsolation
508 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
509 def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self):
510 self._TestExceptionInProducerRaisesAndTerminates(3, 1)
511
512 @RequiresIsolation
513 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
514 def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self):
515 self._TestExceptionInProducerRaisesAndTerminates(3, 3)
516
517 @Timeout
518 def _TestExceptionInProducerRaisesAndTerminates(self, process_count,
519 thread_count):
520 args = self # The ProducerThread will try and fail to iterate over this.
521 try:
522 self._RunApply(_ReturnOneValue, args, process_count, thread_count)
523 self.fail('Did not raise expected exception.')
524 except TypeError:
525 pass
526
527 @RequiresIsolation
528 def testSkippedArgumentsSingleThreadSingleProcess(self):
529 self._TestSkippedArguments(1, 1)
530
531 @RequiresIsolation
532 def testSkippedArgumentsMultiThreadSingleProcess(self):
533 self._TestSkippedArguments(1, 3)
534
535 @RequiresIsolation
536 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
537 def testSkippedArgumentsSingleThreadMultiProcess(self):
538 self._TestSkippedArguments(3, 1)
539
540 @RequiresIsolation
541 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
542 def testSkippedArgumentsMultiThreadMultiProcess(self):
543 self._TestSkippedArguments(3, 3)
544
545 @Timeout
546 def _TestSkippedArguments(self, process_count, thread_count):
547
548 # Skip a proper subset of the arguments.
549 n = 2 * process_count * thread_count
550 args = range(1, n + 1)
551 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
552 arg_checker=_SkipEvenNumbersArgChecker)
553 self.assertEqual(n / 2, len(results)) # We know n is even.
554 self.assertEqual(n / 2, sum(results))
555
556 # Skip all arguments.
557 args = [2 * x for x in args]
558 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
559 arg_checker=_SkipEvenNumbersArgChecker)
560 self.assertEqual(0, len(results))
561
562
563 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
564 """Tests parallelism framework works with multiprocessing module unavailable.
565
566 Notably, this test has no way to override previous calls
567 to gslib.util.MultiprocessingIsAvailable to prevent the initialization of
568 all of the global variables in command.py, so this still behaves slightly
569 differently than the behavior one would see on a machine where the
570 multiprocessing functionality is actually not available (in particular, it
571 will not catch the case where a global variable that is not available for
572 the sequential path is referenced before initialization).
573 """
574 command_class = FakeCommandWithoutMultiprocessingModule
OLDNEW
« no previous file with comments | « third_party/gsutil/gslib/tests/test_parallel_cp.py ('k') | third_party/gsutil/gslib/tests/test_perfdiag.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698