Chromium Code Reviews| Index: testing_support/tests/thread_watcher_test.py |
| diff --git a/testing_support/tests/thread_watcher_test.py b/testing_support/tests/thread_watcher_test.py |
| index 7bf5aa62a9afd99ff15b54ad55a05f8fccd83334..4989f2d8311a5625b9b4c84ef36e9bff95057875 100644 |
| --- a/testing_support/tests/thread_watcher_test.py |
| +++ b/testing_support/tests/thread_watcher_test.py |
| @@ -2,83 +2,111 @@ |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| -import mock |
| import re |
| +import sys |
| import threading |
| -import time |
| import unittest |
| from testing_support import thread_watcher |
| -class TestNoExtraThreads(thread_watcher.TestCase): |
| - def mock_test(self): |
| - pass |
| - |
| - |
| -class TestExtraThreads(thread_watcher.TestCase): |
| - def _start_thread(self, name='foo'): |
| - stop = threading.Event() |
| - def thread_func(): |
| - while not stop.is_set(): |
| - time.sleep(0.01) |
| - t = threading.Thread(target=thread_func, name=name) |
| - t.start() |
| - return t, stop |
| +class _PuppetThread(threading.Thread): |
| + def __init__(self, name): |
| + super(_PuppetThread, self).__init__(name=name) |
| + self.daemon = True |
| + self._start_event = threading.Event() |
| + self._stop_event = threading.Event() |
| + self.start() |
| + # For the thread to actually start. |
| + self._start_event.wait() |
| - def mock_test(self): |
| - self.t1, self.stop1 = self._start_thread('foo') |
| - self.t2, self.stop2 = self._start_thread('bar') |
| + def run(self): |
| + self._start_event.set() |
| + self._stop_event.wait() |
| - def stopThreads(self): |
| - self.stop1.set() |
| - self.stop2.set() |
| - self.t1.join() |
| - self.t2.join() |
| + def stop(self): |
| + self._stop_event.set() |
| + self.join() |
| class ThreadWatcherTestCase(thread_watcher.TestCase): |
| - @mock.patch('unittest.TestCase.fail') |
| - def test_no_extra_threads(self, fail_mock): |
| - TestNoExtraThreads('mock_test').run() |
| - self.assertFalse(fail_mock.called) |
| - |
| - @mock.patch('unittest.TestCase.fail') |
| - def test_extra_threads(self, fail_mock): |
| - t = TestExtraThreads('mock_test') |
| - t.run() |
| - t.stopThreads() |
| - |
| - self.assertEqual(len(fail_mock.call_args_list), 1) |
| - self.assertEqual(len(fail_mock.call_args_list[0]), 2) |
| - self.assertEqual(len(fail_mock.call_args_list[0][0]), 1) |
| - self.assertEqual(len(fail_mock.call_args_list[0][1]), 0) |
| - |
| - error_message = fail_mock.call_args_list[0][0][0] |
| + def setUp(self): |
| + self._fail_called = [] |
| + self.watcher = thread_watcher.ThreadWatcherMixIn() |
| + self.watcher.fail = lambda x: self._fail_called.append(x) |
| + self._threads = [] |
| + |
| + def tearDown(self): |
| + for t in self._threads: |
| + t.stop() |
| + |
| + def test_no_extra_threads(self): |
| + self.watcher.setUp() |
| + self.watcher.tearDown() |
| + self.assertEqual(self._fail_called, []) |
| + |
| + def test_no_extra_threads_start_stop(self): |
| + self.watcher.setUp() |
| + self._threads.append(_PuppetThread('puppet1')) |
| + self._threads[-1].stop() |
| + self.watcher.tearDown() |
| + self.assertEqual(self._fail_called, []) |
| + |
| + def test_extra_threads(self): |
| + self.watcher.setUp() |
| + self._threads.append(_PuppetThread('foo')) |
| + self._threads.append(_PuppetThread('bar')) |
| + self.watcher.tearDown() |
| + self.assertEqual(len(self._fail_called), 1) |
| + error_message = self._fail_called[0] |
| self.assertRegexpMatches( |
| error_message, |
| - re.compile('^Found 2 running thread\(s\) after the test.\n\n' |
| - 'Thread <Thread\(foo, started \d+\)> stacktrace:\n' |
| - ' .*\n\n' |
| - 'Thread <Thread\(bar, started \d+\)> stacktrace:\n' |
| - ' .*\n$', re.DOTALL) |
| + re.compile( |
| + '^Found 2 running thread\(s\) after the test.\n\n' |
| + 'Thread <_PuppetThread\(foo, started daemon \d+\)> stacktrace:\n' |
| + ' .*\n\n' |
| + 'Thread <_PuppetThread\(bar, started daemon \d+\)> stacktrace:\n' |
| + ' .*\n$', re.DOTALL) |
| ) |
| - self.assertNotIn(' Thread stopped while acquiring stacktrace.\n', error_message) |
| + self.assertNotIn(' Thread stopped while acquiring stacktrace.\n', |
| + error_message) |
| + |
| + def test_fail_get_stacktrace(self): |
| + self.watcher.setUp() |
| + self._threads.append(_PuppetThread('foo')) |
| + try: |
| + old = sys._current_frames |
| + sys._current_frames = lambda: {} |
| + self.watcher.tearDown() |
| + finally: |
| + sys._current_frames = old |
| + |
| + self.assertEqual(len(self._fail_called), 1) |
| + error_message = self._fail_called[0] |
| + self.assertIn(' Thread stopped while acquiring stacktrace.\n', error_message) |
| - @mock.patch('unittest.TestCase.fail') |
| - def test_fail_get_stacktrace(self, fail_mock): |
| - with mock.patch('sys._current_frames', mock.Mock(return_value={})): |
| - t = TestExtraThreads('mock_test') |
| - t.run() |
| - t.stopThreads() |
| - self.assertEqual(len(fail_mock.call_args_list), 1) |
| - self.assertEqual(len(fail_mock.call_args_list[0]), 2) |
| - self.assertEqual(len(fail_mock.call_args_list[0][0]), 1) |
| - self.assertEqual(len(fail_mock.call_args_list[0][1]), 0) |
| +class ThreadWatcherTestCaseUsageTest(thread_watcher.TestCase): |
|
tandrii(chromium)
2016/07/07 11:33:57
testing actual usage turned out to be very simple
Sergiy Byelozyorov
2016/07/08 08:22:13
Awesome. Thanks.
|
| + @classmethod |
| + def setUpClass(cls): |
| + cls._threads = [] |
| + |
| + @classmethod |
| + def tearDownClass(cls): |
| + for t in cls._threads: |
| + t.stop() |
| + |
| + @unittest.expectedFailure |
| + def test_fail_extra_threads(self): |
| + self._threads.append(_PuppetThread(name)) |
| + |
| + def test_ok(self): |
| + pass |
| + |
| + def test_ok_with_threads(self): |
| + self._threads.append(_PuppetThread('ok')) |
| + self._threads[-1].stop() |
| - error_message = fail_mock.call_args_list[0][0][0] |
| - self.assertIn(' Thread stopped while acquiring stacktrace.\n', error_message) |
| if __name__ == '__main__': |