Index: testing_support/tests/thread_watcher_test.py |
diff --git a/testing_support/tests/thread_watcher_test.py b/testing_support/tests/thread_watcher_test.py |
index 7bf5aa62a9afd99ff15b54ad55a05f8fccd83334..4989f2d8311a5625b9b4c84ef36e9bff95057875 100644 |
--- a/testing_support/tests/thread_watcher_test.py |
+++ b/testing_support/tests/thread_watcher_test.py |
@@ -2,83 +2,111 @@ |
# Use of this source code is governed by a BSD-style license that can be |
# found in the LICENSE file. |
-import mock |
import re |
+import sys |
import threading |
-import time |
import unittest |
from testing_support import thread_watcher |
-class TestNoExtraThreads(thread_watcher.TestCase): |
- def mock_test(self): |
- pass |
- |
- |
-class TestExtraThreads(thread_watcher.TestCase): |
- def _start_thread(self, name='foo'): |
- stop = threading.Event() |
- def thread_func(): |
- while not stop.is_set(): |
- time.sleep(0.01) |
- t = threading.Thread(target=thread_func, name=name) |
- t.start() |
- return t, stop |
+class _PuppetThread(threading.Thread): |
+ def __init__(self, name): |
+ super(_PuppetThread, self).__init__(name=name) |
+ self.daemon = True |
+ self._start_event = threading.Event() |
+ self._stop_event = threading.Event() |
+ self.start() |
+ # For the thread to actually start. |
+ self._start_event.wait() |
- def mock_test(self): |
- self.t1, self.stop1 = self._start_thread('foo') |
- self.t2, self.stop2 = self._start_thread('bar') |
+ def run(self): |
+ self._start_event.set() |
+ self._stop_event.wait() |
- def stopThreads(self): |
- self.stop1.set() |
- self.stop2.set() |
- self.t1.join() |
- self.t2.join() |
+ def stop(self): |
+ self._stop_event.set() |
+ self.join() |
class ThreadWatcherTestCase(thread_watcher.TestCase): |
- @mock.patch('unittest.TestCase.fail') |
- def test_no_extra_threads(self, fail_mock): |
- TestNoExtraThreads('mock_test').run() |
- self.assertFalse(fail_mock.called) |
- |
- @mock.patch('unittest.TestCase.fail') |
- def test_extra_threads(self, fail_mock): |
- t = TestExtraThreads('mock_test') |
- t.run() |
- t.stopThreads() |
- |
- self.assertEqual(len(fail_mock.call_args_list), 1) |
- self.assertEqual(len(fail_mock.call_args_list[0]), 2) |
- self.assertEqual(len(fail_mock.call_args_list[0][0]), 1) |
- self.assertEqual(len(fail_mock.call_args_list[0][1]), 0) |
- |
- error_message = fail_mock.call_args_list[0][0][0] |
+ def setUp(self): |
+ self._fail_called = [] |
+ self.watcher = thread_watcher.ThreadWatcherMixIn() |
+ self.watcher.fail = lambda x: self._fail_called.append(x) |
+ self._threads = [] |
+ |
+ def tearDown(self): |
+ for t in self._threads: |
+ t.stop() |
+ |
+ def test_no_extra_threads(self): |
+ self.watcher.setUp() |
+ self.watcher.tearDown() |
+ self.assertEqual(self._fail_called, []) |
+ |
+ def test_no_extra_threads_start_stop(self): |
+ self.watcher.setUp() |
+ self._threads.append(_PuppetThread('puppet1')) |
+ self._threads[-1].stop() |
+ self.watcher.tearDown() |
+ self.assertEqual(self._fail_called, []) |
+ |
+ def test_extra_threads(self): |
+ self.watcher.setUp() |
+ self._threads.append(_PuppetThread('foo')) |
+ self._threads.append(_PuppetThread('bar')) |
+ self.watcher.tearDown() |
+ self.assertEqual(len(self._fail_called), 1) |
+ error_message = self._fail_called[0] |
self.assertRegexpMatches( |
error_message, |
- re.compile('^Found 2 running thread\(s\) after the test.\n\n' |
- 'Thread <Thread\(foo, started \d+\)> stacktrace:\n' |
- ' .*\n\n' |
- 'Thread <Thread\(bar, started \d+\)> stacktrace:\n' |
- ' .*\n$', re.DOTALL) |
+ re.compile( |
+ '^Found 2 running thread\(s\) after the test.\n\n' |
+ 'Thread <_PuppetThread\(foo, started daemon \d+\)> stacktrace:\n' |
+ ' .*\n\n' |
+ 'Thread <_PuppetThread\(bar, started daemon \d+\)> stacktrace:\n' |
+ ' .*\n$', re.DOTALL) |
) |
- self.assertNotIn(' Thread stopped while acquiring stacktrace.\n', error_message) |
+ self.assertNotIn(' Thread stopped while acquiring stacktrace.\n', |
+ error_message) |
+ |
+ def test_fail_get_stacktrace(self): |
+ self.watcher.setUp() |
+ self._threads.append(_PuppetThread('foo')) |
+ try: |
+ old = sys._current_frames |
+ sys._current_frames = lambda: {} |
+ self.watcher.tearDown() |
+ finally: |
+ sys._current_frames = old |
+ |
+ self.assertEqual(len(self._fail_called), 1) |
+ error_message = self._fail_called[0] |
+ self.assertIn(' Thread stopped while acquiring stacktrace.\n', error_message) |
- @mock.patch('unittest.TestCase.fail') |
- def test_fail_get_stacktrace(self, fail_mock): |
- with mock.patch('sys._current_frames', mock.Mock(return_value={})): |
- t = TestExtraThreads('mock_test') |
- t.run() |
- t.stopThreads() |
- self.assertEqual(len(fail_mock.call_args_list), 1) |
- self.assertEqual(len(fail_mock.call_args_list[0]), 2) |
- self.assertEqual(len(fail_mock.call_args_list[0][0]), 1) |
- self.assertEqual(len(fail_mock.call_args_list[0][1]), 0) |
+class ThreadWatcherTestCaseUsageTest(thread_watcher.TestCase): |
tandrii(chromium)
2016/07/07 11:33:57
testing actual usage turned out to be very simple
Sergiy Byelozyorov
2016/07/08 08:22:13
Awesome. Thanks.
|
+ @classmethod |
+ def setUpClass(cls): |
+ cls._threads = [] |
+ |
+ @classmethod |
+ def tearDownClass(cls): |
+ for t in cls._threads: |
+ t.stop() |
+ |
+ @unittest.expectedFailure |
+ def test_fail_extra_threads(self): |
+ self._threads.append(_PuppetThread(name)) |
+ |
+ def test_ok(self): |
+ pass |
+ |
+ def test_ok_with_threads(self): |
+ self._threads.append(_PuppetThread('ok')) |
+ self._threads[-1].stop() |
- error_message = fail_mock.call_args_list[0][0][0] |
- self.assertIn(' Thread stopped while acquiring stacktrace.\n', error_message) |
if __name__ == '__main__': |