| Index: testing_support/tests/thread_watcher_test.py
|
| diff --git a/testing_support/tests/thread_watcher_test.py b/testing_support/tests/thread_watcher_test.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..7bf5aa62a9afd99ff15b54ad55a05f8fccd83334
|
| --- /dev/null
|
| +++ b/testing_support/tests/thread_watcher_test.py
|
| @@ -0,0 +1,85 @@
|
| +# Copyright 2016 The Chromium Authors. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +import mock
|
| +import re
|
| +import threading
|
| +import time
|
| +import unittest
|
| +
|
| +from testing_support import thread_watcher
|
| +
|
| +class TestNoExtraThreads(thread_watcher.TestCase):
|
| + def mock_test(self):
|
| + pass
|
| +
|
| +
|
| +class TestExtraThreads(thread_watcher.TestCase):
|
| + def _start_thread(self, name='foo'):
|
| + stop = threading.Event()
|
| + def thread_func():
|
| + while not stop.is_set():
|
| + time.sleep(0.01)
|
| +
|
| + t = threading.Thread(target=thread_func, name=name)
|
| + t.start()
|
| + return t, stop
|
| +
|
| + def mock_test(self):
|
| + self.t1, self.stop1 = self._start_thread('foo')
|
| + self.t2, self.stop2 = self._start_thread('bar')
|
| +
|
| + def stopThreads(self):
|
| + self.stop1.set()
|
| + self.stop2.set()
|
| + self.t1.join()
|
| + self.t2.join()
|
| +
|
| +
|
| +class ThreadWatcherTestCase(thread_watcher.TestCase):
|
| + @mock.patch('unittest.TestCase.fail')
|
| + def test_no_extra_threads(self, fail_mock):
|
| + TestNoExtraThreads('mock_test').run()
|
| + self.assertFalse(fail_mock.called)
|
| +
|
| + @mock.patch('unittest.TestCase.fail')
|
| + def test_extra_threads(self, fail_mock):
|
| + t = TestExtraThreads('mock_test')
|
| + t.run()
|
| + t.stopThreads()
|
| +
|
| + self.assertEqual(len(fail_mock.call_args_list), 1)
|
| + self.assertEqual(len(fail_mock.call_args_list[0]), 2)
|
| + self.assertEqual(len(fail_mock.call_args_list[0][0]), 1)
|
| + self.assertEqual(len(fail_mock.call_args_list[0][1]), 0)
|
| +
|
| + error_message = fail_mock.call_args_list[0][0][0]
|
| + self.assertRegexpMatches(
|
| + error_message,
|
| + re.compile('^Found 2 running thread\(s\) after the test.\n\n'
|
| + 'Thread <Thread\(foo, started \d+\)> stacktrace:\n'
|
| + ' .*\n\n'
|
| + 'Thread <Thread\(bar, started \d+\)> stacktrace:\n'
|
| + ' .*\n$', re.DOTALL)
|
| + )
|
| + self.assertNotIn(' Thread stopped while acquiring stacktrace.\n', error_message)
|
| +
|
| + @mock.patch('unittest.TestCase.fail')
|
| + def test_fail_get_stacktrace(self, fail_mock):
|
| + with mock.patch('sys._current_frames', mock.Mock(return_value={})):
|
| + t = TestExtraThreads('mock_test')
|
| + t.run()
|
| + t.stopThreads()
|
| +
|
| + self.assertEqual(len(fail_mock.call_args_list), 1)
|
| + self.assertEqual(len(fail_mock.call_args_list[0]), 2)
|
| + self.assertEqual(len(fail_mock.call_args_list[0][0]), 1)
|
| + self.assertEqual(len(fail_mock.call_args_list[0][1]), 0)
|
| +
|
| + error_message = fail_mock.call_args_list[0][0][0]
|
| + self.assertIn(' Thread stopped while acquiring stacktrace.\n', error_message)
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + unittest.main()
|
|
|