OLD | NEW |
| (Empty) |
1 # -*- coding: utf-8 -*- | |
2 # Copyright 2013 Google Inc. All Rights Reserved. | |
3 # | |
4 # Permission is hereby granted, free of charge, to any person obtaining a | |
5 # copy of this software and associated documentation files (the | |
6 # "Software"), to deal in the Software without restriction, including | |
7 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
8 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
9 # persons to whom the Software is furnished to do so, subject to the fol- | |
10 # lowing conditions: | |
11 # | |
12 # The above copyright notice and this permission notice shall be included | |
13 # in all copies or substantial portions of the Software. | |
14 # | |
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
21 # IN THE SOFTWARE. | |
22 """Unit tests for gsutil parallelism framework.""" | |
23 | |
24 from __future__ import absolute_import | |
25 | |
26 import functools | |
27 import signal | |
28 | |
29 from boto.storage_uri import BucketStorageUri | |
30 from gslib import cs_api_map | |
31 from gslib.command import Command | |
32 from gslib.command import CreateGsutilLogger | |
33 from gslib.command import DummyArgChecker | |
34 import gslib.tests.testcase as testcase | |
35 from gslib.tests.testcase.base import RequiresIsolation | |
36 from gslib.tests.util import unittest | |
37 from gslib.util import IS_WINDOWS | |
38 from gslib.util import MultiprocessingIsAvailable | |
39 | |
40 | |
41 # Amount of time for an individual test to run before timing out. We need a | |
42 # reasonably high value since if many tests are running in parallel, an | |
43 # individual test may take a while to complete. | |
44 _TEST_TIMEOUT_SECONDS = 120 | |
45 | |
46 | |
47 def Timeout(func): | |
48 """Decorator used to provide a timeout for functions.""" | |
49 @functools.wraps(func) | |
50 def Wrapper(*args, **kwargs): | |
51 if not IS_WINDOWS: | |
52 signal.signal(signal.SIGALRM, _HandleAlarm) | |
53 signal.alarm(_TEST_TIMEOUT_SECONDS) | |
54 try: | |
55 func(*args, **kwargs) | |
56 finally: | |
57 if not IS_WINDOWS: | |
58 signal.alarm(0) # Cancel the alarm. | |
59 return Wrapper | |
60 | |
61 | |
62 # pylint: disable=unused-argument | |
63 def _HandleAlarm(signal_num, cur_stack_frame): | |
64 raise Exception('Test timed out.') | |
65 | |
66 | |
67 class CustomException(Exception): | |
68 | |
69 def __init__(self, exception_str): | |
70 super(CustomException, self).__init__(exception_str) | |
71 | |
72 | |
73 def _ReturnOneValue(cls, args, thread_state=None): | |
74 return 1 | |
75 | |
76 | |
77 def _FailureFunc(cls, args, thread_state=None): | |
78 raise CustomException('Failing on purpose.') | |
79 | |
80 | |
81 def _FailingExceptionHandler(cls, e): | |
82 cls.failure_count += 1 | |
83 raise CustomException('Exception handler failing on purpose.') | |
84 | |
85 | |
86 def _ExceptionHandler(cls, e): | |
87 cls.logger.exception(e) | |
88 cls.failure_count += 1 | |
89 | |
90 | |
91 def _IncrementByLength(cls, args, thread_state=None): | |
92 cls.arg_length_sum += len(args) | |
93 | |
94 | |
95 def _AdjustProcessCountIfWindows(process_count): | |
96 if IS_WINDOWS: | |
97 return 1 | |
98 else: | |
99 return process_count | |
100 | |
101 | |
102 def _ReApplyWithReplicatedArguments(cls, args, thread_state=None): | |
103 """Calls Apply with arguments repeated seven times. | |
104 | |
105 The first two elements of args should be the process and thread counts, | |
106 respectively, to be used for the recursive calls. | |
107 | |
108 Args: | |
109 cls: The Command class to call Apply on. | |
110 args: Arguments to pass to Apply. | |
111 thread_state: Unused, required by function signature. | |
112 | |
113 Returns: | |
114 Number of values returned by the two calls to Apply. | |
115 """ | |
116 new_args = [args] * 7 | |
117 process_count = _AdjustProcessCountIfWindows(args[0]) | |
118 thread_count = args[1] | |
119 return_values = cls.Apply(_PerformNRecursiveCalls, new_args, | |
120 _ExceptionHandler, arg_checker=DummyArgChecker, | |
121 process_count=process_count, | |
122 thread_count=thread_count, | |
123 should_return_results=True) | |
124 ret = sum(return_values) | |
125 | |
126 return_values = cls.Apply(_ReturnOneValue, new_args, | |
127 _ExceptionHandler, arg_checker=DummyArgChecker, | |
128 process_count=process_count, | |
129 thread_count=thread_count, | |
130 should_return_results=True) | |
131 | |
132 return len(return_values) + ret | |
133 | |
134 | |
135 def _PerformNRecursiveCalls(cls, args, thread_state=None): | |
136 """Calls Apply to perform N recursive calls. | |
137 | |
138 The first two elements of args should be the process and thread counts, | |
139 respectively, to be used for the recursive calls, while N is the third element | |
140 (the number of recursive calls to make). | |
141 | |
142 Args: | |
143 cls: The Command class to call Apply on. | |
144 args: Arguments to pass to Apply. | |
145 thread_state: Unused, required by function signature. | |
146 | |
147 Returns: | |
148 Number of values returned by the call to Apply. | |
149 """ | |
150 process_count = _AdjustProcessCountIfWindows(args[0]) | |
151 thread_count = args[1] | |
152 return_values = cls.Apply(_ReturnOneValue, [()] * args[2], _ExceptionHandler, | |
153 arg_checker=DummyArgChecker, | |
154 process_count=process_count, | |
155 thread_count=thread_count, | |
156 should_return_results=True) | |
157 return len(return_values) | |
158 | |
159 | |
160 def _SkipEvenNumbersArgChecker(cls, arg): | |
161 return arg % 2 != 0 | |
162 | |
163 | |
164 class FailingIterator(object): | |
165 | |
166 def __init__(self, size, failure_indices): | |
167 self.size = size | |
168 self.failure_indices = failure_indices | |
169 self.current_index = 0 | |
170 | |
171 def __iter__(self): | |
172 return self | |
173 | |
174 def next(self): | |
175 if self.current_index == self.size: | |
176 raise StopIteration('') | |
177 elif self.current_index in self.failure_indices: | |
178 self.current_index += 1 | |
179 raise CustomException( | |
180 'Iterator failing on purpose at index %d.' % self.current_index) | |
181 else: | |
182 self.current_index += 1 | |
183 return self.current_index - 1 | |
184 | |
185 | |
186 class FakeCommand(Command): | |
187 """Fake command class for overriding command instance state.""" | |
188 command_spec = Command.CreateCommandSpec( | |
189 'fake', | |
190 command_name_aliases=[], | |
191 ) | |
192 # Help specification. See help_provider.py for documentation. | |
193 help_spec = Command.HelpSpec( | |
194 help_name='fake', | |
195 help_name_aliases=[], | |
196 help_type='command_help', | |
197 help_one_line_summary='Something to take up space.', | |
198 help_text='Something else to take up space.', | |
199 subcommand_help_text={}, | |
200 ) | |
201 | |
202 def __init__(self, do_parallel): | |
203 self.bucket_storage_uri_class = BucketStorageUri | |
204 support_map = { | |
205 'gs': ['JSON'], | |
206 's3': ['XML'] | |
207 } | |
208 default_map = { | |
209 'gs': 'JSON', | |
210 's3': 'XML' | |
211 } | |
212 self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap( | |
213 cs_api_map.GsutilApiClassMapFactory, support_map, default_map) | |
214 self.logger = CreateGsutilLogger('FakeCommand') | |
215 self.parallel_operations = do_parallel | |
216 self.failure_count = 0 | |
217 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] | |
218 self.debug = 0 | |
219 | |
220 | |
221 class FakeCommandWithoutMultiprocessingModule(FakeCommand): | |
222 | |
223 def __init__(self, do_parallel): | |
224 super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel) | |
225 self.multiprocessing_is_available = False | |
226 | |
227 | |
228 # TODO: Figure out a good way to test that ctrl+C really stops execution, | |
229 # and also that ctrl+C works when there are still tasks enqueued. | |
230 class TestParallelismFramework(testcase.GsUtilUnitTestCase): | |
231 """gsutil parallelism framework test suite.""" | |
232 | |
233 command_class = FakeCommand | |
234 | |
235 def _RunApply(self, func, args_iterator, process_count, thread_count, | |
236 command_inst=None, shared_attrs=None, fail_on_error=False, | |
237 thr_exc_handler=None, arg_checker=DummyArgChecker): | |
238 command_inst = command_inst or self.command_class(True) | |
239 exception_handler = thr_exc_handler or _ExceptionHandler | |
240 | |
241 return command_inst.Apply(func, args_iterator, exception_handler, | |
242 thread_count=thread_count, | |
243 process_count=process_count, | |
244 arg_checker=arg_checker, | |
245 should_return_results=True, | |
246 shared_attrs=shared_attrs, | |
247 fail_on_error=fail_on_error) | |
248 | |
249 @RequiresIsolation | |
250 def testBasicApplySingleProcessSingleThread(self): | |
251 self._TestBasicApply(1, 1) | |
252 | |
253 @RequiresIsolation | |
254 def testBasicApplySingleProcessMultiThread(self): | |
255 self._TestBasicApply(1, 3) | |
256 | |
257 @RequiresIsolation | |
258 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
259 def testBasicApplyMultiProcessSingleThread(self): | |
260 self._TestBasicApply(3, 1) | |
261 | |
262 @RequiresIsolation | |
263 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
264 def testBasicApplyMultiProcessMultiThread(self): | |
265 self._TestBasicApply(3, 3) | |
266 | |
267 @Timeout | |
268 def _TestBasicApply(self, process_count, thread_count): | |
269 args = [()] * (17 * process_count * thread_count + 1) | |
270 | |
271 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) | |
272 self.assertEqual(len(args), len(results)) | |
273 | |
274 @RequiresIsolation | |
275 def testIteratorFailureSingleProcessSingleThread(self): | |
276 self._TestIteratorFailure(1, 1) | |
277 | |
278 @RequiresIsolation | |
279 def testIteratorFailureSingleProcessMultiThread(self): | |
280 self._TestIteratorFailure(1, 3) | |
281 | |
282 @RequiresIsolation | |
283 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
284 def testIteratorFailureMultiProcessSingleThread(self): | |
285 self._TestIteratorFailure(3, 1) | |
286 | |
287 @RequiresIsolation | |
288 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
289 def testIteratorFailureMultiProcessMultiThread(self): | |
290 self._TestIteratorFailure(3, 3) | |
291 | |
292 @Timeout | |
293 def _TestIteratorFailure(self, process_count, thread_count): | |
294 """Tests apply with a failing iterator.""" | |
295 # Tests for fail_on_error == False. | |
296 | |
297 args = FailingIterator(10, [0]) | |
298 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) | |
299 self.assertEqual(9, len(results)) | |
300 | |
301 args = FailingIterator(10, [5]) | |
302 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) | |
303 self.assertEqual(9, len(results)) | |
304 | |
305 args = FailingIterator(10, [9]) | |
306 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) | |
307 self.assertEqual(9, len(results)) | |
308 | |
309 if process_count * thread_count > 1: | |
310 # In this case, we should ignore the fail_on_error flag. | |
311 args = FailingIterator(10, [9]) | |
312 results = self._RunApply(_ReturnOneValue, args, process_count, | |
313 thread_count, fail_on_error=True) | |
314 self.assertEqual(9, len(results)) | |
315 | |
316 args = FailingIterator(10, range(10)) | |
317 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) | |
318 self.assertEqual(0, len(results)) | |
319 | |
320 args = FailingIterator(0, []) | |
321 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) | |
322 self.assertEqual(0, len(results)) | |
323 | |
324 @RequiresIsolation | |
325 def testTestSharedAttrsWorkSingleProcessSingleThread(self): | |
326 self._TestSharedAttrsWork(1, 1) | |
327 | |
328 @RequiresIsolation | |
329 def testTestSharedAttrsWorkSingleProcessMultiThread(self): | |
330 self._TestSharedAttrsWork(1, 3) | |
331 | |
332 @RequiresIsolation | |
333 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
334 def testTestSharedAttrsWorkMultiProcessSingleThread(self): | |
335 self._TestSharedAttrsWork(3, 1) | |
336 | |
337 @RequiresIsolation | |
338 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
339 def testTestSharedAttrsWorkMultiProcessMultiThread(self): | |
340 self._TestSharedAttrsWork(3, 3) | |
341 | |
342 @Timeout | |
343 def _TestSharedAttrsWork(self, process_count, thread_count): | |
344 """Tests that Apply successfully uses shared_attrs.""" | |
345 command_inst = self.command_class(True) | |
346 command_inst.arg_length_sum = 19 | |
347 args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd'] | |
348 self._RunApply(_IncrementByLength, args, process_count, | |
349 thread_count, command_inst=command_inst, | |
350 shared_attrs=['arg_length_sum']) | |
351 expected_sum = 19 | |
352 for arg in args: | |
353 expected_sum += len(arg) | |
354 self.assertEqual(expected_sum, command_inst.arg_length_sum) | |
355 | |
356 # Test that shared variables work when the iterator fails. | |
357 command_inst = self.command_class(True) | |
358 args = FailingIterator(10, [1, 3, 5]) | |
359 self._RunApply(_ReturnOneValue, args, process_count, thread_count, | |
360 command_inst=command_inst, shared_attrs=['failure_count']) | |
361 self.assertEqual(3, command_inst.failure_count) | |
362 | |
363 @RequiresIsolation | |
364 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self): | |
365 self._TestThreadsSurviveExceptionsInFunc(1, 1) | |
366 | |
367 @RequiresIsolation | |
368 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self): | |
369 self._TestThreadsSurviveExceptionsInFunc(1, 3) | |
370 | |
371 @RequiresIsolation | |
372 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
373 def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self): | |
374 self._TestThreadsSurviveExceptionsInFunc(3, 1) | |
375 | |
376 @RequiresIsolation | |
377 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
378 def testThreadsSurviveExceptionsInFuncMultiProcessMultiThread(self): | |
379 self._TestThreadsSurviveExceptionsInFunc(3, 3) | |
380 | |
381 @Timeout | |
382 def _TestThreadsSurviveExceptionsInFunc(self, process_count, thread_count): | |
383 command_inst = self.command_class(True) | |
384 args = ([()] * 5) | |
385 self._RunApply(_FailureFunc, args, process_count, thread_count, | |
386 command_inst=command_inst, shared_attrs=['failure_count'], | |
387 thr_exc_handler=_FailingExceptionHandler) | |
388 self.assertEqual(len(args), command_inst.failure_count) | |
389 | |
390 @RequiresIsolation | |
391 def testThreadsSurviveExceptionsInHandlerSingleProcessSingleThread(self): | |
392 self._TestThreadsSurviveExceptionsInHandler(1, 1) | |
393 | |
394 @RequiresIsolation | |
395 def testThreadsSurviveExceptionsInHandlerSingleProcessMultiThread(self): | |
396 self._TestThreadsSurviveExceptionsInHandler(1, 3) | |
397 | |
398 @RequiresIsolation | |
399 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
400 def testThreadsSurviveExceptionsInHandlerMultiProcessSingleThread(self): | |
401 self._TestThreadsSurviveExceptionsInHandler(3, 1) | |
402 | |
403 @RequiresIsolation | |
404 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
405 def testThreadsSurviveExceptionsInHandlerMultiProcessMultiThread(self): | |
406 self._TestThreadsSurviveExceptionsInHandler(3, 3) | |
407 | |
408 @Timeout | |
409 def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count): | |
410 command_inst = self.command_class(True) | |
411 args = ([()] * 5) | |
412 self._RunApply(_FailureFunc, args, process_count, thread_count, | |
413 command_inst=command_inst, shared_attrs=['failure_count'], | |
414 thr_exc_handler=_FailingExceptionHandler) | |
415 self.assertEqual(len(args), command_inst.failure_count) | |
416 | |
417 @RequiresIsolation | |
418 @Timeout | |
419 def testFailOnErrorFlag(self): | |
420 """Tests that fail_on_error produces the correct exception on failure.""" | |
421 def _ExpectCustomException(test_func): | |
422 try: | |
423 test_func() | |
424 self.fail( | |
425 'Setting fail_on_error should raise any exception encountered.') | |
426 except CustomException, e: | |
427 pass | |
428 except Exception, e: | |
429 self.fail('Got unexpected error: ' + str(e)) | |
430 | |
431 def _RunFailureFunc(): | |
432 command_inst = self.command_class(True) | |
433 args = ([()] * 5) | |
434 self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst, | |
435 shared_attrs=['failure_count'], fail_on_error=True) | |
436 _ExpectCustomException(_RunFailureFunc) | |
437 | |
438 def _RunFailingIteratorFirstPosition(): | |
439 args = FailingIterator(10, [0]) | |
440 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) | |
441 self.assertEqual(0, len(results)) | |
442 _ExpectCustomException(_RunFailingIteratorFirstPosition) | |
443 | |
444 def _RunFailingIteratorPositionMiddlePosition(): | |
445 args = FailingIterator(10, [5]) | |
446 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) | |
447 self.assertEqual(5, len(results)) | |
448 _ExpectCustomException(_RunFailingIteratorPositionMiddlePosition) | |
449 | |
450 def _RunFailingIteratorLastPosition(): | |
451 args = FailingIterator(10, [9]) | |
452 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) | |
453 self.assertEqual(9, len(results)) | |
454 _ExpectCustomException(_RunFailingIteratorLastPosition) | |
455 | |
456 def _RunFailingIteratorMultiplePositions(): | |
457 args = FailingIterator(10, [1, 3, 5]) | |
458 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) | |
459 self.assertEqual(1, len(results)) | |
460 _ExpectCustomException(_RunFailingIteratorMultiplePositions) | |
461 | |
462 @RequiresIsolation | |
463 def testRecursiveDepthThreeDifferentFunctionsSingleProcessSingleThread(self): | |
464 self._TestRecursiveDepthThreeDifferentFunctions(1, 1) | |
465 | |
466 @RequiresIsolation | |
467 def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self): | |
468 self._TestRecursiveDepthThreeDifferentFunctions(1, 3) | |
469 | |
470 @RequiresIsolation | |
471 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
472 def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self): | |
473 self._TestRecursiveDepthThreeDifferentFunctions(3, 1) | |
474 | |
475 @RequiresIsolation | |
476 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
477 def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self): | |
478 self._TestRecursiveDepthThreeDifferentFunctions(3, 3) | |
479 | |
480 @Timeout | |
481 def _TestRecursiveDepthThreeDifferentFunctions(self, process_count, | |
482 thread_count): | |
483 """Tests recursive application of Apply. | |
484 | |
485 Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls | |
486 Apply(C). | |
487 | |
488 Args: | |
489 process_count: Number of processes to use. | |
490 thread_count: Number of threads to use. | |
491 """ | |
492 base_args = [3, 1, 4, 1, 5] | |
493 args = [[process_count, thread_count, count] for count in base_args] | |
494 | |
495 results = self._RunApply(_ReApplyWithReplicatedArguments, args, | |
496 process_count, thread_count) | |
497 self.assertEqual(7 * (sum(base_args) + len(base_args)), sum(results)) | |
498 | |
499 @RequiresIsolation | |
500 def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self): | |
501 self._TestExceptionInProducerRaisesAndTerminates(1, 1) | |
502 | |
503 @RequiresIsolation | |
504 def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self): | |
505 self._TestExceptionInProducerRaisesAndTerminates(1, 3) | |
506 | |
507 @RequiresIsolation | |
508 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
509 def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self): | |
510 self._TestExceptionInProducerRaisesAndTerminates(3, 1) | |
511 | |
512 @RequiresIsolation | |
513 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
514 def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self): | |
515 self._TestExceptionInProducerRaisesAndTerminates(3, 3) | |
516 | |
517 @Timeout | |
518 def _TestExceptionInProducerRaisesAndTerminates(self, process_count, | |
519 thread_count): | |
520 args = self # The ProducerThread will try and fail to iterate over this. | |
521 try: | |
522 self._RunApply(_ReturnOneValue, args, process_count, thread_count) | |
523 self.fail('Did not raise expected exception.') | |
524 except TypeError: | |
525 pass | |
526 | |
527 @RequiresIsolation | |
528 def testSkippedArgumentsSingleThreadSingleProcess(self): | |
529 self._TestSkippedArguments(1, 1) | |
530 | |
531 @RequiresIsolation | |
532 def testSkippedArgumentsMultiThreadSingleProcess(self): | |
533 self._TestSkippedArguments(1, 3) | |
534 | |
535 @RequiresIsolation | |
536 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
537 def testSkippedArgumentsSingleThreadMultiProcess(self): | |
538 self._TestSkippedArguments(3, 1) | |
539 | |
540 @RequiresIsolation | |
541 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') | |
542 def testSkippedArgumentsMultiThreadMultiProcess(self): | |
543 self._TestSkippedArguments(3, 3) | |
544 | |
545 @Timeout | |
546 def _TestSkippedArguments(self, process_count, thread_count): | |
547 | |
548 # Skip a proper subset of the arguments. | |
549 n = 2 * process_count * thread_count | |
550 args = range(1, n + 1) | |
551 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, | |
552 arg_checker=_SkipEvenNumbersArgChecker) | |
553 self.assertEqual(n / 2, len(results)) # We know n is even. | |
554 self.assertEqual(n / 2, sum(results)) | |
555 | |
556 # Skip all arguments. | |
557 args = [2 * x for x in args] | |
558 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, | |
559 arg_checker=_SkipEvenNumbersArgChecker) | |
560 self.assertEqual(0, len(results)) | |
561 | |
562 | |
563 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework): | |
564 """Tests parallelism framework works with multiprocessing module unavailable. | |
565 | |
566 Notably, this test has no way to override previous calls | |
567 to gslib.util.MultiprocessingIsAvailable to prevent the initialization of | |
568 all of the global variables in command.py, so this still behaves slightly | |
569 differently than the behavior one would see on a machine where the | |
570 multiprocessing functionality is actually not available (in particular, it | |
571 will not catch the case where a global variable that is not available for | |
572 the sequential path is referenced before initialization). | |
573 """ | |
574 command_class = FakeCommandWithoutMultiprocessingModule | |
OLD | NEW |