Index: tools/nocompile_driver.py |
diff --git a/tools/nocompile_driver.py b/tools/nocompile_driver.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..2c5b354632426c38045251a36c1e35e591d83742 |
--- /dev/null |
+++ b/tools/nocompile_driver.py |
@@ -0,0 +1,472 @@ |
+#!/usr/bin/python |
+# Copyright (c) 2011 The Chromium Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+"""Implements a simple "negative compile" test for C++ on linux. |
+ |
+Sometimes a C++ API needs to ensure that various usages cannot compile. To |
+enable unittesting of these assertions, we use this python script to |
+invoke gcc on a source file and assert that compilation fails. |
+ |
+For more info, see: |
+ http://dev.chromium.org/developers/testing/no-compile-tests |
+""" |
+ |
+import ast |
+import locale |
+import os |
+import re |
+import select |
+import shlex |
+import subprocess |
+import sys |
+import time |
+ |
+ |
+# Matches lines that start with #if and have the substring TEST in the |
+# conditional. Also extracts the comment. This allows us to search for |
+# lines like the following: |
+# |
+# #ifdef NCTEST_NAME_OF_TEST // [r'expected output'] |
+# #if defined(NCTEST_NAME_OF_TEST) // [r'expected output'] |
+# #if NCTEST_NAME_OF_TEST // [r'expected output'] |
+# #elif NCTEST_NAME_OF_TEST // [r'expected output'] |
+# #elif DISABLED_NCTEST_NAME_OF_TEST // [r'expected output'] |
+# |
+# inside the unittest file. |
+NCTEST_CONFIG_RE = re.compile(r'^#(?:el)?if.*\s+(\S*NCTEST\S*)\s*(//.*)?') |
+ |
+ |
+# Matches and removes the defined() preprocesor predicate. This is useful |
+# for test cases that use the preprocessor if-statement form: |
+# |
+# #if defined(NCTEST_NAME_OF_TEST) |
+# |
+# Should be used to post-process the results found by NCTEST_CONFIG_RE. |
+STRIP_DEFINED_RE = re.compile(r'defined\((.*)\)') |
+ |
+ |
+# Used to grab the expectation from comment at the end of an #ifdef. See |
+# NCTEST_CONFIG_RE's comment for examples of what the format should look like. |
+# |
+# The extracted substring should be a python array of regular expressions. |
+EXTRACT_EXPECTATION_RE = re.compile(r'//\s*(\[.*\])') |
+ |
+ |
+# The header for the result file so that it can be compiled. |
+RESULT_FILE_HEADER = """ |
+// This file is generated by the no compile test from: |
+// %s |
+ |
+#include "base/logging.h" |
+#include "testing/gtest/include/gtest/gtest.h" |
+ |
+""" |
+ |
+ |
+# The GUnit test function to output on a successful test completion. |
+SUCCESS_GUNIT_TEMPLATE = """ |
+TEST(%s, %s) { |
+ LOG(INFO) << "Took %f secs. Started at %f, ended at %f"; |
+} |
+""" |
+ |
+# The GUnit test function to output for a disabled test. |
+DISABLED_GUNIT_TEMPLATE = """ |
+TEST(%s, %s) { } |
+""" |
+ |
+ |
+# Timeout constants. |
+NCTEST_TERMINATE_TIMEOUT_SEC = 60 |
+NCTEST_KILL_TIMEOUT_SEC = NCTEST_TERMINATE_TIMEOUT_SEC + 2 |
+BUSY_LOOP_MAX_TIME_SEC = NCTEST_KILL_TIMEOUT_SEC * 2 |
+ |
+ |
+def ValidateInput(parallelism, sourcefile_path, cflags, resultfile_path): |
+ """Make sure the arguments being passed in are sane.""" |
+ assert parallelism >= 1 |
+ assert type(sourcefile_path) is str |
+ assert type(cflags) is str |
+ assert type(resultfile_path) is str |
+ |
+ |
+def ParseExpectation(expectation_string): |
+ """Extracts expectation definition from the trailing comment on the ifdef. |
+ |
+ See the comment on NCTEST_CONFIG_RE for examples of the format we are parsing. |
+ |
+ Args: |
+ expectation_string: A string like "// [r'some_regex']" |
+ |
+ Returns: |
+ A list of compiled regular expressions indicating all possible valid |
+ compiler outputs. If the list is empty, all outputs are considered valid. |
+ """ |
+ assert expectation_string is not None |
+ |
+ match = EXTRACT_EXPECTATION_RE.match(expectation_string) |
+ assert match |
+ |
+ raw_expectation = ast.literal_eval(match.group(1)) |
+ assert type(raw_expectation) is list |
+ |
+ expectation = [] |
+ for regex_str in raw_expectation: |
+ assert type(regex_str) is str |
+ expectation.append(re.compile(regex_str)) |
+ return expectation |
+ |
+ |
+def ExtractTestConfigs(sourcefile_path): |
+ """Parses the soruce file for test configurations. |
+ |
+ Each no-compile test in the file is separated by an ifdef macro. We scan |
+ the source file with the NCTEST_CONFIG_RE to find all ifdefs that look like |
+ they demark one no-compile test and try to extract the test configuration |
+ from that. |
+ |
+ Args: |
+ sourcefile_path: The path to the source file. |
+ |
+ Returns: |
+ A list of test configurations. Each test configuration is a dictionary of |
+ the form: |
+ |
+ { name: 'NCTEST_NAME' |
+ suite_name: 'SOURCE_FILE_NAME' |
+ expectations: [re.Pattern, re.Pattern] } |
+ |
+ The |suite_name| is used to generate a pretty gtest output on successful |
+ completion of the no compile test. |
+ |
+ The compiled regexps in |expectations| define the valid outputs of the |
+ compiler. If any one of the listed patterns matches either the stderr or |
+ stdout from the compilation, and the compilation failed, then the test is |
+ considered to have succeeded. If the list is empty, than we ignore the |
+ compiler output and just check for failed compilation. If |expectations| |
+ is actually None, then this specifies a compiler sanity check test, which |
+ should expect a SUCCESSFUL compilation. |
+ """ |
+ sourcefile = open(sourcefile_path, 'r') |
+ |
+ # Convert filename from underscores to CamelCase. |
+ words = os.path.splitext(os.path.basename(sourcefile_path))[0].split('_') |
+ words = [w.capitalize() for w in words] |
+ suite_name = 'NoCompile' + ''.join(words) |
+ |
+ # Start with at least the compiler sanity test. You need to always have one |
+ # sanity test to show that compiler flags and configuration are not just |
+ # wrong. Otherwise, having a misconfigured compiler, or an error in the |
+ # shared portions of the .nc file would cause all tests to erroneously pass. |
+ test_configs = [{'name': 'NCTEST_SANITY', |
+ 'suite_name': suite_name, |
+ 'expectations': None}] |
+ |
+ for line in sourcefile: |
+ match_result = NCTEST_CONFIG_RE.match(line) |
+ if not match_result: |
+ continue |
+ |
+ groups = match_result.groups() |
+ |
+ # Grab the name and remove the defined() predicate if there is one. |
+ name = groups[0] |
+ strip_result = STRIP_DEFINED_RE.match(name) |
+ if strip_result: |
+ name = strip_result.group(1) |
+ |
+ # Read expectations if there are any. |
+ test_configs.append({'name': name, |
+ 'suite_name': suite_name, |
+ 'expectations': ParseExpectation(groups[1])}) |
+ sourcefile.close() |
+ return test_configs |
+ |
+ |
+def StartTest(sourcefile_path, cflags, config): |
+ """Start one negative compile test. |
+ |
+ Args: |
+ sourcefile_path: The path to the source file. |
+ cflags: A string with all the CFLAGS to give to gcc. This string will be |
+ split by shelex so be careful with escaping. |
+ config: A dictionary describing the test. See ExtractTestConfigs |
+ for a description of the config format. |
+ |
+ Returns: |
+ A dictionary containing all the information about the started test. The |
+ fields in the dictionary are as follows: |
+ { 'proc': A subprocess object representing the compiler run. |
+ 'cmdline': The exectued command line. |
+ 'name': The name of the test. |
+ 'suite_name': The suite name to use when generating the gunit test |
+ result. |
+ 'terminate_timeout': The timestamp in seconds since the epoch after |
+ which the test should be terminated. |
+ 'kill_timeout': The timestamp in seconds since the epoch after which |
+ the test should be given a hard kill signal. |
+ 'started_at': A timestamp in seconds since the epoch for when this test |
+ was started. |
+ 'aborted_at': A timestamp in seconds since the epoch for when this test |
+ was aborted. If the test completed successfully, |
+ this value is 0. |
+ 'finished_at': A timestamp in seconds since the epoch for when this |
+ test was successfully complete. If the test is aborted, |
+ or running, this value is 0. |
+ 'expectations': A dictionary with the test expectations. See |
+ ParseExpectation() for the structure. |
+ } |
+ """ |
+ # TODO(ajwong): Get the compiler from gyp. |
+ cmdline = ['g++'] |
+ cmdline.extend(shlex.split(cflags)) |
+ name = config['name'] |
+ expectations = config['expectations'] |
+ if expectations is not None: |
+ cmdline.append('-D%s' % name) |
+ cmdline.extend(['-o', '/dev/null', '-c', '-x', 'c++', sourcefile_path]) |
+ |
+ process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, |
+ stderr=subprocess.PIPE) |
+ now = time.time() |
+ return {'proc': process, |
+ 'cmdline': ' '.join(cmdline), |
+ 'name': name, |
+ 'suite_name': config['suite_name'], |
+ 'terminate_timeout': now + NCTEST_TERMINATE_TIMEOUT_SEC, |
+ 'kill_timeout': now + NCTEST_KILL_TIMEOUT_SEC, |
+ 'started_at': now, |
+ 'aborted_at': 0, |
+ 'finished_at': 0, |
+ 'expectations': expectations} |
+ |
+ |
+def PassTest(resultfile, test): |
+ """Logs the result of a test started by StartTest(), or a disabled test |
+ configuration. |
+ |
+ Args: |
+ resultfile: File object for .cc file that results are written to. |
+ test: An instance of the dictionary returned by StartTest(), a |
+ configuration from ExtractTestConfigs(). |
+ """ |
+ # The 'started_at' key is only added if a test has been started. |
+ if 'started_at' in test: |
+ resultfile.write(SUCCESS_GUNIT_TEMPLATE % ( |
+ test['suite_name'], test['name'], |
+ test['finished_at'] - test['started_at'], |
+ test['started_at'], test['finished_at'])) |
+ else: |
+ resultfile.write(DISABLED_GUNIT_TEMPLATE % ( |
+ test['suite_name'], test['name'])) |
+ |
+ |
+def FailTest(resultfile, test, error, stdout=None, stderr=None): |
+ """Logs the result of a test started by StartTest() |
+ |
+ Args: |
+ resultfile: File object for .cc file that results are written to. |
+ test: An instance of the dictionary returned by StartTest() |
+ error: The printable reason for the failure. |
+ stdout: The test's output to stdout. |
+ stderr: The test's output to stderr. |
+ """ |
+ resultfile.write('#error %s Failed: %s\n' % (test['name'], error)) |
+ resultfile.write('#error compile line: %s\n' % test['cmdline']) |
+ if stdout and len(stdout) != 0: |
+ resultfile.write('#error %s stdout:\n' % test['name']) |
+ for line in stdout.split('\n'): |
+ resultfile.write('#error %s\n' % line) |
+ |
+ if stderr and len(stderr) != 0: |
+ resultfile.write('#error %s stderr:\n' % test['name']) |
+ for line in stderr.split('\n'): |
+ resultfile.write('#error %s\n' % line) |
+ resultfile.write('\n') |
+ |
+ |
+def WriteStats(resultfile, suite_name, timings): |
+ """Logs the peformance timings for each stage of the script into a fake test. |
+ |
+ Args: |
+ resultfile: File object for .cc file that results are written to. |
+ suite_name: The name of the GUnit suite this test belongs to. |
+ timings: Dictionary with timestamps for each stage of the script run. |
+ """ |
+ stats_template = ("Started %f, Ended %f, Total %fs, Extract %fs, " |
+ "Compile %fs, Process %fs") |
+ total_secs = timings['results_processed'] - timings['started'] |
+ extract_secs = timings['extract_done'] - timings['started'] |
+ compile_secs = timings['compile_done'] - timings['extract_done'] |
+ process_secs = timings['results_processed'] - timings['compile_done'] |
+ resultfile.write('TEST(%s, Stats) { LOG(INFO) << "%s"; }\n' % ( |
+ suite_name, stats_template % ( |
+ timings['started'], timings['results_processed'], total_secs, |
+ extract_secs, compile_secs, process_secs))) |
+ |
+ |
+def ProcessTestResult(resultfile, test): |
+ """Interprets and logs the result of a test started by StartTest() |
+ |
+ Args: |
+ resultfile: File object for .cc file that results are written to. |
+ test: The dictionary from StartTest() to process. |
+ """ |
+ # Snap a copy of stdout and stderr into the test dictionary immediately |
+ # cause we can only call this once on the Popen object, and lots of stuff |
+ # below will want access to it. |
+ proc = test['proc'] |
+ (stdout, stderr) = proc.communicate() |
+ |
+ if test['aborted_at'] != 0: |
+ FailTest(resultfile, test, "Compile timed out. Started %f ended %f." % |
+ (test['started_at'], test['aborted_at'])) |
+ return |
+ |
+ if test['expectations'] is None: |
+ # This signals a compiler sanity check test. Fail iff compilation failed. |
+ if proc.poll() == 0: |
+ PassTest(resultfile, test) |
+ return |
+ else: |
+ FailTest(resultfile, test, 'Sanity compile failed. Is compiler borked?', |
+ stdout, stderr) |
+ return |
+ elif proc.poll() == 0: |
+ # Handle failure due to successful compile. |
+ FailTest(resultfile, test, |
+ 'Unexpected successful compilation.', |
+ stdout, stderr) |
+ return |
+ else: |
+ # Check the output has the right expectations. If there are no |
+ # expectations, then we just consider the output "matched" by default. |
+ if len(test['expectations']) == 0: |
+ PassTest(resultfile, test) |
+ return |
+ |
+ # Otherwise test against all expectations. |
+ for regexp in test['expectations']: |
+ if (regexp.search(stdout) is not None or |
+ regexp.search(stderr) is not None): |
+ PassTest(resultfile, test) |
+ return |
+ expectation_str = ', '.join( |
+ ["r'%s'" % regexp.pattern for regexp in test['expectations']]) |
+ FailTest(resultfile, test, |
+ 'Expectations [%s] did not match output.' % expectation_str, |
+ stdout, stderr) |
+ return |
+ |
+ |
+def CompleteAtLeastOneTest(resultfile, executing_tests): |
+ """Blocks until at least one task is removed from executing_tests. |
+ |
+ This function removes completed tests from executing_tests, logging failures |
+ and output. If no tests can be removed, it will enter a poll-loop until one |
+ test finishes or times out. On a timeout, this function is responsible for |
+ terminating the process in the appropriate fashion. |
+ |
+ Args: |
+ executing_tests: A dict mapping a string containing the test name to the |
+ test dict return from StartTest(). |
+ |
+ Returns: |
+ A list of tests that have finished. |
+ """ |
+ finished_tests = [] |
+ busy_loop_timeout = time.time() + BUSY_LOOP_MAX_TIME_SEC |
+ while len(finished_tests) == 0: |
+ # If we don't make progress for too long, assume the code is just dead. |
+ assert busy_loop_timeout > time.time() |
+ |
+ # Select on the output pipes. |
+ read_set = [] |
+ for test in executing_tests.values(): |
+ read_set.extend([test['proc'].stderr, test['proc'].stdout]) |
+ result = select.select(read_set, [], read_set, NCTEST_TERMINATE_TIMEOUT_SEC) |
+ |
+ # Now attempt to process results. |
+ now = time.time() |
+ for test in executing_tests.values(): |
+ proc = test['proc'] |
+ if proc.poll() is not None: |
+ test['finished_at'] = now |
+ finished_tests.append(test) |
+ elif test['terminate_timeout'] < now: |
+ proc.terminate() |
+ test['aborted_at'] = now |
+ elif test['kill_timeout'] < now: |
+ proc.kill() |
+ test['aborted_at'] = now |
+ |
+ for test in finished_tests: |
+ del executing_tests[test['name']] |
+ return finished_tests |
+ |
+ |
+def main(): |
+ if len(sys.argv) != 5: |
+ print ('Usage: %s <parallelism> <sourcefile> <cflags> <resultfile>' % |
+ sys.argv[0]) |
+ sys.exit(1) |
+ |
+ # Force us into the "C" locale so the compiler doesn't localize its output. |
+ # In particular, this stops gcc from using smart quotes when in english UTF-8 |
+ # locales. This makes the expectation writing much easier. |
+ os.environ['LC_ALL'] = 'C' |
+ |
+ parallelism = int(sys.argv[1]) |
+ sourcefile_path = sys.argv[2] |
+ cflags = sys.argv[3] |
+ resultfile_path = sys.argv[4] |
+ |
+ timings = {'started': time.time()} |
+ |
+ ValidateInput(parallelism, sourcefile_path, cflags, resultfile_path) |
+ |
+ test_configs = ExtractTestConfigs(sourcefile_path) |
+ timings['extract_done'] = time.time() |
+ |
+ resultfile = open(resultfile_path, 'w') |
+ resultfile.write(RESULT_FILE_HEADER % sourcefile_path) |
+ |
+ # Run the no-compile tests, but ensure we do not run more than |parallelism| |
+ # tests at once. |
+ timings['header_written'] = time.time() |
+ executing_tests = {} |
+ finished_tests = [] |
+ for config in test_configs: |
+ # CompleteAtLeastOneTest blocks until at least one test finishes. Thus, this |
+ # acts as a semaphore. We cannot use threads + a real semaphore because |
+ # subprocess forks, which can cause all sorts of hilarity with threads. |
+ if len(executing_tests) >= parallelism: |
+ finished_tests.extend(CompleteAtLeastOneTest(resultfile, executing_tests)) |
+ |
+ if config['name'].startswith('DISABLED_'): |
+ PassTest(resultfile, config) |
+ else: |
+ test = StartTest(sourcefile_path, cflags, config) |
+ assert test['name'] not in executing_tests |
+ executing_tests[test['name']] = test |
+ |
+ # If there are no more test to start, we still need to drain the running |
+ # ones. |
+ while len(executing_tests) > 0: |
+ finished_tests.extend(CompleteAtLeastOneTest(resultfile, executing_tests)) |
+ timings['compile_done'] = time.time() |
+ |
+ for test in finished_tests: |
+ ProcessTestResult(resultfile, test) |
+ timings['results_processed'] = time.time() |
+ |
+ # We always know at least a sanity test was run. |
+ WriteStats(resultfile, finished_tests[0]['suite_name'], timings) |
+ |
+ resultfile.close() |
+ |
+ |
+if __name__ == '__main__': |
+ main() |