| Index: testing_support/tests/thread_watcher_test.py
 | 
| diff --git a/testing_support/tests/thread_watcher_test.py b/testing_support/tests/thread_watcher_test.py
 | 
| new file mode 100644
 | 
| index 0000000000000000000000000000000000000000..7bf5aa62a9afd99ff15b54ad55a05f8fccd83334
 | 
| --- /dev/null
 | 
| +++ b/testing_support/tests/thread_watcher_test.py
 | 
| @@ -0,0 +1,85 @@
 | 
| +# Copyright 2016 The Chromium Authors. All rights reserved.
 | 
| +# Use of this source code is governed by a BSD-style license that can be
 | 
| +# found in the LICENSE file.
 | 
| +
 | 
| +import mock
 | 
| +import re
 | 
| +import threading
 | 
| +import time
 | 
| +import unittest
 | 
| +
 | 
| +from testing_support import thread_watcher
 | 
| +
 | 
| +class TestNoExtraThreads(thread_watcher.TestCase):
 | 
| +  def mock_test(self):
 | 
| +    pass
 | 
| +
 | 
| +
 | 
| +class TestExtraThreads(thread_watcher.TestCase):
 | 
| +  def _start_thread(self, name='foo'):
 | 
| +    stop = threading.Event()
 | 
| +    def thread_func():
 | 
| +      while not stop.is_set():
 | 
| +        time.sleep(0.01)
 | 
| +
 | 
| +    t = threading.Thread(target=thread_func, name=name)
 | 
| +    t.start()
 | 
| +    return t, stop
 | 
| +
 | 
| +  def mock_test(self):
 | 
| +    self.t1, self.stop1 = self._start_thread('foo')
 | 
| +    self.t2, self.stop2 = self._start_thread('bar')
 | 
| +
 | 
| +  def stopThreads(self):
 | 
| +    self.stop1.set()
 | 
| +    self.stop2.set()
 | 
| +    self.t1.join()
 | 
| +    self.t2.join()
 | 
| +
 | 
| +
 | 
| +class ThreadWatcherTestCase(thread_watcher.TestCase):
 | 
| +  @mock.patch('unittest.TestCase.fail')
 | 
| +  def test_no_extra_threads(self, fail_mock):
 | 
| +    TestNoExtraThreads('mock_test').run()
 | 
| +    self.assertFalse(fail_mock.called)
 | 
| +
 | 
| +  @mock.patch('unittest.TestCase.fail')
 | 
| +  def test_extra_threads(self, fail_mock):
 | 
| +    t = TestExtraThreads('mock_test')
 | 
| +    t.run()
 | 
| +    t.stopThreads()
 | 
| +
 | 
| +    self.assertEqual(len(fail_mock.call_args_list), 1)
 | 
| +    self.assertEqual(len(fail_mock.call_args_list[0]), 2)
 | 
| +    self.assertEqual(len(fail_mock.call_args_list[0][0]), 1)
 | 
| +    self.assertEqual(len(fail_mock.call_args_list[0][1]), 0)
 | 
| +
 | 
| +    error_message = fail_mock.call_args_list[0][0][0]
 | 
| +    self.assertRegexpMatches(
 | 
| +        error_message,
 | 
| +        re.compile('^Found 2 running thread\(s\) after the test.\n\n'
 | 
| +                   'Thread <Thread\(foo, started \d+\)> stacktrace:\n'
 | 
| +                   '  .*\n\n'
 | 
| +                   'Thread <Thread\(bar, started \d+\)> stacktrace:\n'
 | 
| +                   '  .*\n$', re.DOTALL)
 | 
| +    )
 | 
| +    self.assertNotIn('  Thread stopped while acquiring stacktrace.\n', error_message)
 | 
| +
 | 
| +  @mock.patch('unittest.TestCase.fail')
 | 
| +  def test_fail_get_stacktrace(self, fail_mock):
 | 
| +    with mock.patch('sys._current_frames', mock.Mock(return_value={})):
 | 
| +      t = TestExtraThreads('mock_test')
 | 
| +      t.run()
 | 
| +      t.stopThreads()
 | 
| +
 | 
| +    self.assertEqual(len(fail_mock.call_args_list), 1)
 | 
| +    self.assertEqual(len(fail_mock.call_args_list[0]), 2)
 | 
| +    self.assertEqual(len(fail_mock.call_args_list[0][0]), 1)
 | 
| +    self.assertEqual(len(fail_mock.call_args_list[0][1]), 0)
 | 
| +
 | 
| +    error_message = fail_mock.call_args_list[0][0][0]
 | 
| +    self.assertIn('  Thread stopped while acquiring stacktrace.\n', error_message)
 | 
| +
 | 
| +
 | 
| +if __name__ == '__main__':
 | 
| +  unittest.main()
 | 
| 
 |