Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(69)

Side by Side Diff: third_party/gsutil/gslib/tests/test_parallelism_framework.py

Issue 1380943003: Roll version of gsutil to 4.15. (Closed) Base URL: https://github.com/catapult-project/catapult.git@master
Patch Set: rebase Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # -*- coding: utf-8 -*- 1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved. 2 # Copyright 2013 Google Inc. All Rights Reserved.
3 # 3 #
4 # Permission is hereby granted, free of charge, to any person obtaining a 4 # Permission is hereby granted, free of charge, to any person obtaining a
5 # copy of this software and associated documentation files (the 5 # copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including 6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish, dis- 7 # without limitation the rights to use, copy, modify, merge, publish, dis-
8 # tribute, sublicense, and/or sell copies of the Software, and to permit 8 # tribute, sublicense, and/or sell copies of the Software, and to permit
9 # persons to whom the Software is furnished to do so, subject to the fol- 9 # persons to whom the Software is furnished to do so, subject to the fol-
10 # lowing conditions: 10 # lowing conditions:
11 # 11 #
12 # The above copyright notice and this permission notice shall be included 12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software. 13 # in all copies or substantial portions of the Software.
14 # 14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 # IN THE SOFTWARE. 21 # IN THE SOFTWARE.
22 """Unit tests for gsutil parallelism framework.""" 22 """Unit tests for gsutil parallelism framework."""
23 23
24 from __future__ import absolute_import 24 from __future__ import absolute_import
25 25
26 import functools 26 import functools
27 import os
27 import signal 28 import signal
29 import threading
30 import time
28 31
29 from boto.storage_uri import BucketStorageUri 32 from boto.storage_uri import BucketStorageUri
30 from gslib import cs_api_map 33 from gslib import cs_api_map
31 from gslib.command import Command 34 from gslib.command import Command
32 from gslib.command import CreateGsutilLogger 35 from gslib.command import CreateGsutilLogger
33 from gslib.command import DummyArgChecker 36 from gslib.command import DummyArgChecker
34 import gslib.tests.testcase as testcase 37 import gslib.tests.testcase as testcase
35 from gslib.tests.testcase.base import RequiresIsolation 38 from gslib.tests.testcase.base import RequiresIsolation
36 from gslib.tests.util import unittest 39 from gslib.tests.util import unittest
40 from gslib.util import CheckMultiprocessingAvailableAndInit
37 from gslib.util import IS_WINDOWS 41 from gslib.util import IS_WINDOWS
38 from gslib.util import MultiprocessingIsAvailable
39 42
40 43
41 # Amount of time for an individual test to run before timing out. We need a 44 # Amount of time for an individual test to run before timing out. We need a
42 # reasonably high value since if many tests are running in parallel, an 45 # reasonably high value since if many tests are running in parallel, an
43 # individual test may take a while to complete. 46 # individual test may take a while to complete.
44 _TEST_TIMEOUT_SECONDS = 120 47 _TEST_TIMEOUT_SECONDS = 120
45 48
46 49
47 def Timeout(func): 50 def Timeout(func):
48 """Decorator used to provide a timeout for functions.""" 51 """Decorator used to provide a timeout for functions."""
(...skipping 18 matching lines...) Expand all
67 class CustomException(Exception): 70 class CustomException(Exception):
68 71
69 def __init__(self, exception_str): 72 def __init__(self, exception_str):
70 super(CustomException, self).__init__(exception_str) 73 super(CustomException, self).__init__(exception_str)
71 74
72 75
73 def _ReturnOneValue(cls, args, thread_state=None): 76 def _ReturnOneValue(cls, args, thread_state=None):
74 return 1 77 return 1
75 78
76 79
80 def _ReturnProcAndThreadId(cls, args, thread_state=None):
81 return (os.getpid(), threading.currentThread().ident)
82
83
84 def _SleepThenReturnProcAndThreadId(cls, args, thread_state=None):
85 # This can fail if the total time to spawn new processes and threads takes
86 # longer than 5 seconds, but if that occurs, then we have a performance
87 # problem that needs to be addressed.
88 time.sleep(5)
89 return _ReturnProcAndThreadId(cls, args, thread_state=thread_state)
90
91
77 def _FailureFunc(cls, args, thread_state=None): 92 def _FailureFunc(cls, args, thread_state=None):
78 raise CustomException('Failing on purpose.') 93 raise CustomException('Failing on purpose.')
79 94
80 95
81 def _FailingExceptionHandler(cls, e): 96 def _FailingExceptionHandler(cls, e):
82 cls.failure_count += 1 97 cls.failure_count += 1
83 raise CustomException('Exception handler failing on purpose.') 98 raise CustomException('Exception handler failing on purpose.')
84 99
85 100
86 def _ExceptionHandler(cls, e): 101 def _ExceptionHandler(cls, e):
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after
207 } 222 }
208 default_map = { 223 default_map = {
209 'gs': 'JSON', 224 'gs': 'JSON',
210 's3': 'XML' 225 's3': 'XML'
211 } 226 }
212 self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap( 227 self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap(
213 cs_api_map.GsutilApiClassMapFactory, support_map, default_map) 228 cs_api_map.GsutilApiClassMapFactory, support_map, default_map)
214 self.logger = CreateGsutilLogger('FakeCommand') 229 self.logger = CreateGsutilLogger('FakeCommand')
215 self.parallel_operations = do_parallel 230 self.parallel_operations = do_parallel
216 self.failure_count = 0 231 self.failure_count = 0
217 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0] 232 self.multiprocessing_is_available = (
233 CheckMultiprocessingAvailableAndInit().is_available)
218 self.debug = 0 234 self.debug = 0
219 235
220 236
221 class FakeCommandWithoutMultiprocessingModule(FakeCommand): 237 class FakeCommandWithoutMultiprocessingModule(FakeCommand):
222 238
223 def __init__(self, do_parallel): 239 def __init__(self, do_parallel):
224 super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel) 240 super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel)
225 self.multiprocessing_is_available = False 241 self.multiprocessing_is_available = False
226 242
227 243
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
265 self._TestBasicApply(3, 3) 281 self._TestBasicApply(3, 3)
266 282
267 @Timeout 283 @Timeout
268 def _TestBasicApply(self, process_count, thread_count): 284 def _TestBasicApply(self, process_count, thread_count):
269 args = [()] * (17 * process_count * thread_count + 1) 285 args = [()] * (17 * process_count * thread_count + 1)
270 286
271 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) 287 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
272 self.assertEqual(len(args), len(results)) 288 self.assertEqual(len(args), len(results))
273 289
274 @RequiresIsolation 290 @RequiresIsolation
291 def testNoTasksSingleProcessSingleThread(self):
292 self._TestApplyWithNoTasks(1, 1)
293
294 @RequiresIsolation
295 def testNoTasksSingleProcessMultiThread(self):
296 self._TestApplyWithNoTasks(1, 3)
297
298 @RequiresIsolation
299 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
300 def testNoTasksMultiProcessSingleThread(self):
301 self._TestApplyWithNoTasks(3, 1)
302
303 @RequiresIsolation
304 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
305 def testNoTasksMultiProcessMultiThread(self):
306 self._TestApplyWithNoTasks(3, 3)
307
308 @Timeout
309 def _TestApplyWithNoTasks(self, process_count, thread_count):
310 """Tests that calling Apply with no tasks releases locks/semaphores."""
311 empty_args = [()]
312
313 for _ in range(process_count * thread_count + 1):
314 self._RunApply(_ReturnOneValue, empty_args, process_count, thread_count)
315
316 # Ensure that work can still be performed.
317 self._TestBasicApply(process_count, thread_count)
318
319 @RequiresIsolation
320 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
321 def testApplySaturatesMultiProcessSingleThread(self):
322 self._TestApplySaturatesAvailableProcessesAndThreads(3, 1)
323
324 @RequiresIsolation
325 def testApplySaturatesSingleProcessMultiThread(self):
326 self._TestApplySaturatesAvailableProcessesAndThreads(1, 3)
327
328 @RequiresIsolation
329 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
330 def testApplySaturatesMultiProcessMultiThread(self):
331 self._TestApplySaturatesAvailableProcessesAndThreads(3, 3)
332
333 @RequiresIsolation
334 def _TestApplySaturatesAvailableProcessesAndThreads(self, process_count,
335 thread_count):
336 """Tests that created processes and threads evenly share tasks."""
337 calls_per_thread = 2
338 args = [()] * (process_count * thread_count * calls_per_thread)
339 expected_calls_per_thread = calls_per_thread
340
341 if not self.command_class(True).multiprocessing_is_available:
342 # When multiprocessing is unavailable, only a single process is used.
343 # Calls should be evenly distributed across threads.
344 expected_calls_per_thread = calls_per_thread * process_count
345
346 results = self._RunApply(_SleepThenReturnProcAndThreadId, args,
347 process_count, thread_count)
348 usage_dict = {} # (process_id, thread_id): number of tasks performed
349 for (process_id, thread_id) in results:
350 usage_dict[(process_id, thread_id)] = (
351 usage_dict.get((process_id, thread_id), 0) + 1)
352
353 for (id_tuple, num_tasks_completed) in usage_dict.iteritems():
354 self.assertEqual(num_tasks_completed, expected_calls_per_thread,
355 'Process %s thread %s completed %s tasks. Expected: %s' %
356 (id_tuple[0], id_tuple[1], num_tasks_completed,
357 expected_calls_per_thread))
358
359 @RequiresIsolation
275 def testIteratorFailureSingleProcessSingleThread(self): 360 def testIteratorFailureSingleProcessSingleThread(self):
276 self._TestIteratorFailure(1, 1) 361 self._TestIteratorFailure(1, 1)
277 362
278 @RequiresIsolation 363 @RequiresIsolation
279 def testIteratorFailureSingleProcessMultiThread(self): 364 def testIteratorFailureSingleProcessMultiThread(self):
280 self._TestIteratorFailure(1, 3) 365 self._TestIteratorFailure(1, 3)
281 366
282 @RequiresIsolation 367 @RequiresIsolation
283 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') 368 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
284 def testIteratorFailureMultiProcessSingleThread(self): 369 def testIteratorFailureMultiProcessSingleThread(self):
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
346 command_inst.arg_length_sum = 19 431 command_inst.arg_length_sum = 19
347 args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd'] 432 args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd']
348 self._RunApply(_IncrementByLength, args, process_count, 433 self._RunApply(_IncrementByLength, args, process_count,
349 thread_count, command_inst=command_inst, 434 thread_count, command_inst=command_inst,
350 shared_attrs=['arg_length_sum']) 435 shared_attrs=['arg_length_sum'])
351 expected_sum = 19 436 expected_sum = 19
352 for arg in args: 437 for arg in args:
353 expected_sum += len(arg) 438 expected_sum += len(arg)
354 self.assertEqual(expected_sum, command_inst.arg_length_sum) 439 self.assertEqual(expected_sum, command_inst.arg_length_sum)
355 440
356 # Test that shared variables work when the iterator fails. 441 # Test that shared variables work when the iterator fails at the beginning,
357 command_inst = self.command_class(True) 442 # middle, and end.
358 args = FailingIterator(10, [1, 3, 5]) 443 for (failing_iterator, expected_failure_count) in (
359 self._RunApply(_ReturnOneValue, args, process_count, thread_count, 444 (FailingIterator(5, [0]), 1),
360 command_inst=command_inst, shared_attrs=['failure_count']) 445 (FailingIterator(10, [1, 3, 5]), 3),
361 self.assertEqual(3, command_inst.failure_count) 446 (FailingIterator(5, [4]), 1)):
447 command_inst = self.command_class(True)
448 args = failing_iterator
449 self._RunApply(_ReturnOneValue, args, process_count, thread_count,
450 command_inst=command_inst, shared_attrs=['failure_count'])
451 self.assertEqual(
452 expected_failure_count, command_inst.failure_count,
453 msg='Failure count did not match. Expected: %s, actual: %s '
454 'for failing iterator of size %s, failing indices %s' %
455 (expected_failure_count, command_inst.failure_count,
456 failing_iterator.size, failing_iterator.failure_indices))
362 457
363 @RequiresIsolation 458 @RequiresIsolation
364 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self): 459 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
365 self._TestThreadsSurviveExceptionsInFunc(1, 1) 460 self._TestThreadsSurviveExceptionsInFunc(1, 1)
366 461
367 @RequiresIsolation 462 @RequiresIsolation
368 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self): 463 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self):
369 self._TestThreadsSurviveExceptionsInFunc(1, 3) 464 self._TestThreadsSurviveExceptionsInFunc(1, 3)
370 465
371 @RequiresIsolation 466 @RequiresIsolation
(...skipping 185 matching lines...) Expand 10 before | Expand all | Expand 10 after
557 args = [2 * x for x in args] 652 args = [2 * x for x in args]
558 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, 653 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
559 arg_checker=_SkipEvenNumbersArgChecker) 654 arg_checker=_SkipEvenNumbersArgChecker)
560 self.assertEqual(0, len(results)) 655 self.assertEqual(0, len(results))
561 656
562 657
563 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework): 658 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
564 """Tests parallelism framework works with multiprocessing module unavailable. 659 """Tests parallelism framework works with multiprocessing module unavailable.
565 660
566 Notably, this test has no way to override previous calls 661 Notably, this test has no way to override previous calls
567 to gslib.util.MultiprocessingIsAvailable to prevent the initialization of 662 to gslib.util.CheckMultiprocessingAvailableAndInit to prevent the
568 all of the global variables in command.py, so this still behaves slightly 663 initialization of all of the global variables in command.py, so this still
569 differently than the behavior one would see on a machine where the 664 behaves slightly differently than the behavior one would see on a machine
570 multiprocessing functionality is actually not available (in particular, it 665 where the multiprocessing functionality is actually not available (in
571 will not catch the case where a global variable that is not available for 666 particular, it will not catch the case where a global variable that is not
572 the sequential path is referenced before initialization). 667 available for the sequential path is referenced before initialization).
573 """ 668 """
574 command_class = FakeCommandWithoutMultiprocessingModule 669 command_class = FakeCommandWithoutMultiprocessingModule
OLDNEW
« no previous file with comments | « third_party/gsutil/gslib/tests/test_parallel_cp.py ('k') | third_party/gsutil/gslib/tests/test_perfdiag.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698