OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import mock |
| 6 import re |
| 7 import threading |
| 8 import time |
| 9 import unittest |
| 10 |
| 11 from testing_support import thread_watcher |
| 12 |
| 13 class TestNoExtraThreads(thread_watcher.TestCase): |
| 14 def mock_test(self): |
| 15 pass |
| 16 |
| 17 |
| 18 class TestExtraThreads(thread_watcher.TestCase): |
| 19 def _start_thread(self, name='foo'): |
| 20 stop = threading.Event() |
| 21 def thread_func(): |
| 22 while not stop.is_set(): |
| 23 time.sleep(0.01) |
| 24 |
| 25 t = threading.Thread(target=thread_func, name=name) |
| 26 t.start() |
| 27 return t, stop |
| 28 |
| 29 def mock_test(self): |
| 30 self.t1, self.stop1 = self._start_thread('foo') |
| 31 self.t2, self.stop2 = self._start_thread('bar') |
| 32 |
| 33 def stopThreads(self): |
| 34 self.stop1.set() |
| 35 self.stop2.set() |
| 36 self.t1.join() |
| 37 self.t2.join() |
| 38 |
| 39 |
| 40 class ThreadWatcherTestCase(thread_watcher.TestCase): |
| 41 @mock.patch('unittest.TestCase.fail') |
| 42 def test_no_extra_threads(self, fail_mock): |
| 43 TestNoExtraThreads('mock_test').run() |
| 44 self.assertFalse(fail_mock.called) |
| 45 |
| 46 @mock.patch('unittest.TestCase.fail') |
| 47 def test_extra_threads(self, fail_mock): |
| 48 t = TestExtraThreads('mock_test') |
| 49 t.run() |
| 50 t.stopThreads() |
| 51 |
| 52 self.assertEqual(len(fail_mock.call_args_list), 1) |
| 53 self.assertEqual(len(fail_mock.call_args_list[0]), 2) |
| 54 self.assertEqual(len(fail_mock.call_args_list[0][0]), 1) |
| 55 self.assertEqual(len(fail_mock.call_args_list[0][1]), 0) |
| 56 |
| 57 error_message = fail_mock.call_args_list[0][0][0] |
| 58 self.assertRegexpMatches( |
| 59 error_message, |
| 60 re.compile('^Found 2 running thread\(s\) after the test.\n\n' |
| 61 'Thread <Thread\(foo, started \d+\)> stacktrace:\n' |
| 62 ' .*\n\n' |
| 63 'Thread <Thread\(bar, started \d+\)> stacktrace:\n' |
| 64 ' .*\n$', re.DOTALL) |
| 65 ) |
| 66 self.assertNotIn(' Thread stopped while acquiring stacktrace.\n', error_mes
sage) |
| 67 |
| 68 @mock.patch('unittest.TestCase.fail') |
| 69 def test_fail_get_stacktrace(self, fail_mock): |
| 70 with mock.patch('sys._current_frames', mock.Mock(return_value={})): |
| 71 t = TestExtraThreads('mock_test') |
| 72 t.run() |
| 73 t.stopThreads() |
| 74 |
| 75 self.assertEqual(len(fail_mock.call_args_list), 1) |
| 76 self.assertEqual(len(fail_mock.call_args_list[0]), 2) |
| 77 self.assertEqual(len(fail_mock.call_args_list[0][0]), 1) |
| 78 self.assertEqual(len(fail_mock.call_args_list[0][1]), 0) |
| 79 |
| 80 error_message = fail_mock.call_args_list[0][0][0] |
| 81 self.assertIn(' Thread stopped while acquiring stacktrace.\n', error_messag
e) |
| 82 |
| 83 |
| 84 if __name__ == '__main__': |
| 85 unittest.main() |
OLD | NEW |