OLD | NEW |
---|---|
1 # Copyright 2016 The Chromium Authors. All rights reserved. | 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import mock | |
6 import re | 5 import re |
6 import sys | |
7 import threading | 7 import threading |
8 import time | |
9 import unittest | 8 import unittest |
10 | 9 |
11 from testing_support import thread_watcher | 10 from testing_support import thread_watcher |
12 | 11 |
13 class TestNoExtraThreads(thread_watcher.TestCase): | |
14 def mock_test(self): | |
15 pass | |
16 | 12 |
13 class _PuppetThread(threading.Thread): | |
14 def __init__(self, name): | |
15 super(_PuppetThread, self).__init__(name=name) | |
16 self.daemon = True | |
17 self._start_event = threading.Event() | |
18 self._stop_event = threading.Event() | |
19 self.start() | |
20 # For the thread to actually start. | |
21 self._start_event.wait() | |
17 | 22 |
18 class TestExtraThreads(thread_watcher.TestCase): | 23 def run(self): |
19 def _start_thread(self, name='foo'): | 24 self._start_event.set() |
20 stop = threading.Event() | 25 self._stop_event.wait() |
21 def thread_func(): | |
22 while not stop.is_set(): | |
23 time.sleep(0.01) | |
24 | 26 |
25 t = threading.Thread(target=thread_func, name=name) | 27 def stop(self): |
26 t.start() | 28 self._stop_event.set() |
27 return t, stop | 29 self.join() |
28 | |
29 def mock_test(self): | |
30 self.t1, self.stop1 = self._start_thread('foo') | |
31 self.t2, self.stop2 = self._start_thread('bar') | |
32 | |
33 def stopThreads(self): | |
34 self.stop1.set() | |
35 self.stop2.set() | |
36 self.t1.join() | |
37 self.t2.join() | |
38 | 30 |
39 | 31 |
40 class ThreadWatcherTestCase(thread_watcher.TestCase): | 32 class ThreadWatcherTestCase(thread_watcher.TestCase): |
41 @mock.patch('unittest.TestCase.fail') | 33 def setUp(self): |
42 def test_no_extra_threads(self, fail_mock): | 34 self._fail_called = [] |
43 TestNoExtraThreads('mock_test').run() | 35 self.watcher = thread_watcher.ThreadWatcherMixIn() |
44 self.assertFalse(fail_mock.called) | 36 self.watcher.fail = lambda x: self._fail_called.append(x) |
37 self._threads = [] | |
45 | 38 |
46 @mock.patch('unittest.TestCase.fail') | 39 def tearDown(self): |
47 def test_extra_threads(self, fail_mock): | 40 for t in self._threads: |
48 t = TestExtraThreads('mock_test') | 41 t.stop() |
49 t.run() | |
50 t.stopThreads() | |
51 | 42 |
52 self.assertEqual(len(fail_mock.call_args_list), 1) | 43 def test_no_extra_threads(self): |
53 self.assertEqual(len(fail_mock.call_args_list[0]), 2) | 44 self.watcher.setUp() |
54 self.assertEqual(len(fail_mock.call_args_list[0][0]), 1) | 45 self.watcher.tearDown() |
55 self.assertEqual(len(fail_mock.call_args_list[0][1]), 0) | 46 self.assertEqual(self._fail_called, []) |
56 | 47 |
57 error_message = fail_mock.call_args_list[0][0][0] | 48 def test_no_extra_threads_start_stop(self): |
49 self.watcher.setUp() | |
50 self._threads.append(_PuppetThread('puppet1')) | |
51 self._threads[-1].stop() | |
52 self.watcher.tearDown() | |
53 self.assertEqual(self._fail_called, []) | |
54 | |
55 def test_extra_threads(self): | |
56 self.watcher.setUp() | |
57 self._threads.append(_PuppetThread('foo')) | |
58 self._threads.append(_PuppetThread('bar')) | |
59 self.watcher.tearDown() | |
60 self.assertEqual(len(self._fail_called), 1) | |
61 error_message = self._fail_called[0] | |
58 self.assertRegexpMatches( | 62 self.assertRegexpMatches( |
59 error_message, | 63 error_message, |
60 re.compile('^Found 2 running thread\(s\) after the test.\n\n' | 64 re.compile( |
61 'Thread <Thread\(foo, started \d+\)> stacktrace:\n' | 65 '^Found 2 running thread\(s\) after the test.\n\n' |
62 ' .*\n\n' | 66 'Thread <_PuppetThread\(foo, started daemon \d+\)> stacktrace:\n' |
63 'Thread <Thread\(bar, started \d+\)> stacktrace:\n' | 67 ' .*\n\n' |
64 ' .*\n$', re.DOTALL) | 68 'Thread <_PuppetThread\(bar, started daemon \d+\)> stacktrace:\n' |
69 ' .*\n$', re.DOTALL) | |
65 ) | 70 ) |
66 self.assertNotIn(' Thread stopped while acquiring stacktrace.\n', error_mes sage) | 71 self.assertNotIn(' Thread stopped while acquiring stacktrace.\n', |
72 error_message) | |
67 | 73 |
68 @mock.patch('unittest.TestCase.fail') | 74 def test_fail_get_stacktrace(self): |
69 def test_fail_get_stacktrace(self, fail_mock): | 75 self.watcher.setUp() |
70 with mock.patch('sys._current_frames', mock.Mock(return_value={})): | 76 self._threads.append(_PuppetThread('foo')) |
71 t = TestExtraThreads('mock_test') | 77 try: |
72 t.run() | 78 old = sys._current_frames |
73 t.stopThreads() | 79 sys._current_frames = lambda: {} |
80 self.watcher.tearDown() | |
81 finally: | |
82 sys._current_frames = old | |
74 | 83 |
75 self.assertEqual(len(fail_mock.call_args_list), 1) | 84 self.assertEqual(len(self._fail_called), 1) |
76 self.assertEqual(len(fail_mock.call_args_list[0]), 2) | 85 error_message = self._fail_called[0] |
77 self.assertEqual(len(fail_mock.call_args_list[0][0]), 1) | |
78 self.assertEqual(len(fail_mock.call_args_list[0][1]), 0) | |
79 | |
80 error_message = fail_mock.call_args_list[0][0][0] | |
81 self.assertIn(' Thread stopped while acquiring stacktrace.\n', error_messag e) | 86 self.assertIn(' Thread stopped while acquiring stacktrace.\n', error_messag e) |
82 | 87 |
83 | 88 |
89 class ThreadWatcherTestCaseUsageTest(thread_watcher.TestCase): | |
tandrii(chromium)
2016/07/07 11:33:57
testing actual usage turned out to be very simple
Sergiy Byelozyorov
2016/07/08 08:22:13
Awesome. Thanks.
| |
90 @classmethod | |
91 def setUpClass(cls): | |
92 cls._threads = [] | |
93 | |
94 @classmethod | |
95 def tearDownClass(cls): | |
96 for t in cls._threads: | |
97 t.stop() | |
98 | |
99 @unittest.expectedFailure | |
100 def test_fail_extra_threads(self): | |
101 self._threads.append(_PuppetThread(name)) | |
102 | |
103 def test_ok(self): | |
104 pass | |
105 | |
106 def test_ok_with_threads(self): | |
107 self._threads.append(_PuppetThread('ok')) | |
108 self._threads[-1].stop() | |
109 | |
110 | |
111 | |
84 if __name__ == '__main__': | 112 if __name__ == '__main__': |
85 unittest.main() | 113 unittest.main() |
OLD | NEW |