Index: testing_support/tests/thread_watcher_test.py |
diff --git a/testing_support/tests/thread_watcher_test.py b/testing_support/tests/thread_watcher_test.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..7bf5aa62a9afd99ff15b54ad55a05f8fccd83334 |
--- /dev/null |
+++ b/testing_support/tests/thread_watcher_test.py |
@@ -0,0 +1,85 @@ |
+# Copyright 2016 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+import mock |
+import re |
+import threading |
+import time |
+import unittest |
+ |
+from testing_support import thread_watcher |
+ |
+class TestNoExtraThreads(thread_watcher.TestCase): |
+ def mock_test(self): |
+ pass |
+ |
+ |
+class TestExtraThreads(thread_watcher.TestCase): |
+ def _start_thread(self, name='foo'): |
+ stop = threading.Event() |
+ def thread_func(): |
+ while not stop.is_set(): |
+ time.sleep(0.01) |
+ |
+ t = threading.Thread(target=thread_func, name=name) |
+ t.start() |
+ return t, stop |
+ |
+ def mock_test(self): |
+ self.t1, self.stop1 = self._start_thread('foo') |
+ self.t2, self.stop2 = self._start_thread('bar') |
+ |
+ def stopThreads(self): |
+ self.stop1.set() |
+ self.stop2.set() |
+ self.t1.join() |
+ self.t2.join() |
+ |
+ |
+class ThreadWatcherTestCase(thread_watcher.TestCase): |
+ @mock.patch('unittest.TestCase.fail') |
+ def test_no_extra_threads(self, fail_mock): |
+ TestNoExtraThreads('mock_test').run() |
+ self.assertFalse(fail_mock.called) |
+ |
+ @mock.patch('unittest.TestCase.fail') |
+ def test_extra_threads(self, fail_mock): |
+ t = TestExtraThreads('mock_test') |
+ t.run() |
+ t.stopThreads() |
+ |
+ self.assertEqual(len(fail_mock.call_args_list), 1) |
+ self.assertEqual(len(fail_mock.call_args_list[0]), 2) |
+ self.assertEqual(len(fail_mock.call_args_list[0][0]), 1) |
+ self.assertEqual(len(fail_mock.call_args_list[0][1]), 0) |
+ |
+ error_message = fail_mock.call_args_list[0][0][0] |
+ self.assertRegexpMatches( |
+ error_message, |
+ re.compile('^Found 2 running thread\(s\) after the test.\n\n' |
+ 'Thread <Thread\(foo, started \d+\)> stacktrace:\n' |
+ ' .*\n\n' |
+ 'Thread <Thread\(bar, started \d+\)> stacktrace:\n' |
+ ' .*\n$', re.DOTALL) |
+ ) |
+ self.assertNotIn(' Thread stopped while acquiring stacktrace.\n', error_message) |
+ |
+ @mock.patch('unittest.TestCase.fail') |
+ def test_fail_get_stacktrace(self, fail_mock): |
+ with mock.patch('sys._current_frames', mock.Mock(return_value={})): |
+ t = TestExtraThreads('mock_test') |
+ t.run() |
+ t.stopThreads() |
+ |
+ self.assertEqual(len(fail_mock.call_args_list), 1) |
+ self.assertEqual(len(fail_mock.call_args_list[0]), 2) |
+ self.assertEqual(len(fail_mock.call_args_list[0][0]), 1) |
+ self.assertEqual(len(fail_mock.call_args_list[0][1]), 0) |
+ |
+ error_message = fail_mock.call_args_list[0][0][0] |
+ self.assertIn(' Thread stopped while acquiring stacktrace.\n', error_message) |
+ |
+ |
+if __name__ == '__main__': |
+ unittest.main() |