Index: tests/run_test_cases_test.py |
diff --git a/tests/run_test_cases_test.py b/tests/run_test_cases_test.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..4e706a94f77a3015f7d04d26f5518683a528b740 |
--- /dev/null |
+++ b/tests/run_test_cases_test.py |
@@ -0,0 +1,193 @@ |
+#!/usr/bin/env python |
+# Copyright (c) 2012 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+import logging |
+import os |
+import sys |
+import unittest |
+ |
+ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
+sys.path.insert(0, ROOT_DIR) |
+ |
+import run_test_cases |
+ |
+SLEEP = os.path.join(ROOT_DIR, 'tests', 'run_test_cases', 'sleep.py') |
+ |
+ |
+def to_native_eol(string): |
+ if sys.platform == 'win32': |
+ return string.replace('\n', '\r\n') |
+ return string |
+ |
+ |
+class ListTestCasesTest(unittest.TestCase): |
+ def test_shards(self): |
+ test_cases = ( |
+ (range(10), 10, 0, 1), |
+ |
+ ([0, 1], 5, 0, 3), |
+ ([2, 3], 5, 1, 3), |
+ ([4 ], 5, 2, 3), |
+ |
+ ([0], 5, 0, 7), |
+ ([1], 5, 1, 7), |
+ ([2], 5, 2, 7), |
+ ([3], 5, 3, 7), |
+ ([4], 5, 4, 7), |
+ ([ ], 5, 5, 7), |
+ ([ ], 5, 6, 7), |
+ |
+ ([0, 1], 4, 0, 2), |
+ ([2, 3], 4, 1, 2), |
+ ) |
+ for expected, range_length, index, shards in test_cases: |
+ result = run_test_cases.filter_shards(range(range_length), index, shards) |
+ self.assertEquals( |
+ expected, result, (result, expected, range_length, index, shards)) |
+ |
+ |
+class RunTestCases(unittest.TestCase): |
+ def test_call(self): |
+ cmd = [sys.executable, SLEEP, '0.001'] |
+ # 0 means no timeout, like None. |
+ output, code = run_test_cases.call_with_timeout(cmd, 0) |
+ self.assertEquals(to_native_eol('Sleeping.\nSlept.\n'), output) |
+ self.assertEquals(0, code) |
+ |
+ def test_call_eol(self): |
+ cmd = [sys.executable, SLEEP, '0.001'] |
+ # 0 means no timeout, like None. |
+ output, code = run_test_cases.call_with_timeout( |
+ cmd, 0, universal_newlines=True) |
+ self.assertEquals('Sleeping.\nSlept.\n', output) |
+ self.assertEquals(0, code) |
+ |
+ def test_call_timed_out_kill(self): |
+ cmd = [sys.executable, SLEEP, '100'] |
+ # On a loaded system, this can be tight. |
+ output, code = run_test_cases.call_with_timeout(cmd, timeout=1) |
+ self.assertEquals(to_native_eol('Sleeping.\n'), output) |
+ if sys.platform == 'win32': |
+ self.assertEquals(1, code) |
+ else: |
+ self.assertEquals(-9, code) |
+ |
+ def test_call_timed_out_kill_eol(self): |
+ cmd = [sys.executable, SLEEP, '100'] |
+ # On a loaded system, this can be tight. |
+ output, code = run_test_cases.call_with_timeout( |
+ cmd, timeout=1, universal_newlines=True) |
+ self.assertEquals('Sleeping.\n', output) |
+ if sys.platform == 'win32': |
+ self.assertEquals(1, code) |
+ else: |
+ self.assertEquals(-9, code) |
+ |
+ def test_call_timeout_no_kill(self): |
+ cmd = [sys.executable, SLEEP, '0.001'] |
+ output, code = run_test_cases.call_with_timeout(cmd, timeout=100) |
+ self.assertEquals(to_native_eol('Sleeping.\nSlept.\n'), output) |
+ self.assertEquals(0, code) |
+ |
+ def test_call_timeout_no_kill_eol(self): |
+ cmd = [sys.executable, SLEEP, '0.001'] |
+ output, code = run_test_cases.call_with_timeout( |
+ cmd, timeout=100, universal_newlines=True) |
+ self.assertEquals('Sleeping.\nSlept.\n', output) |
+ self.assertEquals(0, code) |
+ |
+ def test_gtest_filter(self): |
+ old = run_test_cases.run_test_cases |
+ exe = os.path.join(ROOT_DIR, 'tests', 'gtest_fake', 'gtest_fake_pass.py') |
+ def expect(executable, test_cases, jobs, timeout, run_all, result_file): |
+ self.assertEquals(run_test_cases.fix_python_path([exe]), executable) |
+ self.assertEquals(['Foo.Bar1', 'Foo.Bar3'], test_cases) |
+ self.assertEquals(run_test_cases.num_processors(), jobs) |
+ self.assertEquals(120, timeout) |
+ self.assertEquals(False, run_all) |
+ self.assertEquals(exe + '.run_test_cases', result_file) |
+ return 89 |
+ |
+ try: |
+ run_test_cases.run_test_cases = expect |
+ result = run_test_cases.main([exe, '--gtest_filter=Foo.Bar*-*.Bar2']) |
+ self.assertEquals(89, result) |
+ finally: |
+ run_test_cases.run_test_cases = old |
+ |
+ def testRunSome(self): |
+ tests = [ |
+ # Try with named arguments. Accepts 3*1 failures. |
+ ( |
+ run_test_cases.RunSome( |
+ expected_count=10, |
+ retries=3, |
+ min_failures=1, |
+ max_failure_ratio=0.001), |
+ [False] * 4), |
+ # Same without named arguments. |
+ (run_test_cases.RunSome( 10, 3, 1, 0.001), [False] * 4), |
+ |
+ (run_test_cases.RunSome( 10, 1, 1, 0.001), [False] * 2), |
+ (run_test_cases.RunSome( 10, 1, 1, 0.010), [False] * 2), |
+ |
+ # For low expected_count value, retries * min_failures is the minimum |
+ # bound of accepted failures. |
+ (run_test_cases.RunSome( 10, 3, 1, 0.010), [False] * 4), |
+ (run_test_cases.RunSome( 10, 3, 1, 0.020), [False] * 4), |
+ (run_test_cases.RunSome( 10, 3, 1, 0.050), [False] * 4), |
+ (run_test_cases.RunSome( 10, 3, 1, 0.100), [False] * 4), |
+ (run_test_cases.RunSome( 10, 3, 1, 0.110), [False] * 4), |
+ |
+ # Allows expected_count + retries failures at maximum. |
+ (run_test_cases.RunSome( 10, 3, 1, 0.200), [False] * 6), |
+ (run_test_cases.RunSome( 10, 3, 1, 0.999), [False] * 30), |
+ |
+ # The asympthote is nearing max_failure_ratio for large expected_count |
+ # values. |
+ (run_test_cases.RunSome(1000, 3, 1, 0.050), [False] * 150), |
+ ] |
+ for index, (decider, rounds) in enumerate(tests): |
+ for index2, r in enumerate(rounds): |
+ self.assertFalse(decider.should_stop(), (index, index2, str(decider))) |
+ decider.got_result(r) |
+ self.assertTrue(decider.should_stop(), (index, str(decider))) |
+ |
+ def testStatsInfinite(self): |
+ decider = run_test_cases.RunAll() |
+ for _ in xrange(200): |
+ self.assertFalse(decider.should_stop()) |
+ decider.got_result(False) |
+ |
+ |
+class WorkerPoolTest(unittest.TestCase): |
+ def test_normal(self): |
+ mapper = lambda value: -value |
+ with run_test_cases.ThreadPool(8) as pool: |
+ for i in range(32): |
+ pool.add_task(mapper, i) |
+ results = pool.join() |
+ self.assertEquals(range(-31, 1), sorted(results)) |
+ |
+ def test_exception(self): |
+ class FearsomeException(Exception): |
+ pass |
+ def mapper(value): |
+ raise FearsomeException(value) |
+ task_added = False |
+ try: |
+ with run_test_cases.ThreadPool(8) as pool: |
+ pool.add_task(mapper, 0) |
+ task_added = True |
+ pool.join() |
+ self.fail() |
+ except FearsomeException: |
+ self.assertEquals(True, task_added) |
+ |
+ |
+if __name__ == '__main__': |
+ VERBOSE = '-v' in sys.argv |
+ logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR) |
+ unittest.main() |