| Index: tools/telemetry/third_party/coverage/tests/test_farm.py
|
| diff --git a/tools/telemetry/third_party/coverage/tests/test_farm.py b/tools/telemetry/third_party/coverage/tests/test_farm.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..0db0af88e28d70c0bac1c9abe7a8aca0501af4ef
|
| --- /dev/null
|
| +++ b/tools/telemetry/third_party/coverage/tests/test_farm.py
|
| @@ -0,0 +1,412 @@
|
| +# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
|
| +# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt
|
| +
|
| +"""Run tests in the farm sub-directory. Designed for nose."""
|
| +
|
| +import difflib
|
| +import filecmp
|
| +import fnmatch
|
| +import glob
|
| +import os
|
| +import re
|
| +import shutil
|
| +import sys
|
| +import unittest
|
| +
|
| +from nose.plugins.skip import SkipTest
|
| +
|
| +from coverage.test_helpers import ModuleAwareMixin, SysPathAwareMixin, change_dir, saved_sys_path
|
| +from tests.helpers import run_command
|
| +from tests.backtest import execfile # pylint: disable=redefined-builtin
|
| +
|
| +from coverage.debug import _TEST_NAME_FILE
|
| +
|
| +
|
| +def test_farm(clean_only=False):
|
| + """A test-generating function for nose to find and run."""
|
| + for fname in glob.glob("tests/farm/*/*.py"):
|
| + case = FarmTestCase(fname, clean_only)
|
| + yield (case,)
|
| +
|
| +
|
| +# "rU" was deprecated in 3.4
|
| +READ_MODE = "rU" if sys.version_info < (3, 4) else "r"
|
| +
|
| +
|
| +class FarmTestCase(ModuleAwareMixin, SysPathAwareMixin, unittest.TestCase):
|
| + """A test case from the farm tree.
|
| +
|
| + Tests are short Python script files, often called run.py:
|
| +
|
| + copy("src", "out")
|
| + run('''
|
| + coverage run white.py
|
| + coverage annotate white.py
|
| + ''', rundir="out")
|
| + compare("out", "gold", "*,cover")
|
| + clean("out")
|
| +
|
| + Verbs (copy, run, compare, clean) are methods in this class. FarmTestCase
|
| + has options to allow various uses of the test cases (normal execution,
|
| + cleaning-only, or run and leave the results for debugging).
|
| +
|
| + This class is a unittest.TestCase so that we can use behavior-modifying
|
| + mixins, but it's only useful as a nose test function. Yes, this is
|
| + confusing.
|
| +
|
| + """
|
| +
|
| + # We don't want test runners finding this and instantiating it themselves.
|
| + __test__ = False
|
| +
|
| + def __init__(self, runpy, clean_only=False, dont_clean=False):
|
| + """Create a test case from a run.py file.
|
| +
|
| + `clean_only` means that only the clean() action is executed.
|
| + `dont_clean` means that the clean() action is not executed.
|
| +
|
| + """
|
| + super(FarmTestCase, self).__init__()
|
| +
|
| + self.description = runpy
|
| + self.dir, self.runpy = os.path.split(runpy)
|
| + self.clean_only = clean_only
|
| + self.dont_clean = dont_clean
|
| + self.ok = True
|
| +
|
| + def setUp(self):
|
| + """Test set up, run by nose before __call__."""
|
| + super(FarmTestCase, self).setUp()
|
| + # Modules should be importable from the current directory.
|
| + sys.path.insert(0, '')
|
| +
|
| + def tearDown(self):
|
| + """Test tear down, run by nose after __call__."""
|
| + # Make sure the test is cleaned up, unless we never want to, or if the
|
| + # test failed.
|
| + if not self.dont_clean and self.ok: # pragma: part covered
|
| + self.clean_only = True
|
| + self()
|
| +
|
| + super(FarmTestCase, self).tearDown()
|
| +
|
| + # This object will be run by nose via the __call__ method, and nose
|
| + # doesn't do cleanups in that case. Do them now.
|
| + self.doCleanups()
|
| +
|
| + def runTest(self):
|
| + """Here to make unittest.TestCase happy, but will never be invoked."""
|
| + raise Exception("runTest isn't used in this class!")
|
| +
|
| + def __call__(self):
|
| + """Execute the test from the run.py file."""
|
| + if _TEST_NAME_FILE: # pragma: debugging
|
| + with open(_TEST_NAME_FILE, "w") as f:
|
| + f.write(self.description.replace("/", "_"))
|
| +
|
| + # Prepare a dictionary of globals for the run.py files to use.
|
| + fns = """
|
| + copy run runfunc clean skip
|
| + compare contains contains_any doesnt_contain
|
| + """.split()
|
| + if self.clean_only:
|
| + glo = dict((fn, noop) for fn in fns)
|
| + glo['clean'] = clean
|
| + else:
|
| + glo = dict((fn, globals()[fn]) for fn in fns)
|
| + if self.dont_clean: # pragma: not covered
|
| + glo['clean'] = noop
|
| +
|
| + with change_dir(self.dir):
|
| + try:
|
| + execfile(self.runpy, glo)
|
| + except Exception:
|
| + self.ok = False
|
| + raise
|
| +
|
| + def run_fully(self): # pragma: not covered
|
| + """Run as a full test case, with setUp and tearDown."""
|
| + self.setUp()
|
| + try:
|
| + self()
|
| + finally:
|
| + self.tearDown()
|
| +
|
| +
|
| +# Functions usable inside farm run.py files
|
| +
|
| +def noop(*args_unused, **kwargs_unused):
|
| + """A no-op function to stub out run, copy, etc, when only cleaning."""
|
| + pass
|
| +
|
| +
|
| +def copy(src, dst):
|
| + """Copy a directory."""
|
| + if os.path.exists(dst):
|
| + shutil.rmtree(dst)
|
| + shutil.copytree(src, dst)
|
| +
|
| +
|
| +def run(cmds, rundir="src", outfile=None):
|
| + """Run a list of commands.
|
| +
|
| + `cmds` is a string, commands separated by newlines.
|
| + `rundir` is the directory in which to run the commands.
|
| + `outfile` is a file name to redirect stdout to.
|
| +
|
| + """
|
| + with change_dir(rundir):
|
| + if outfile:
|
| + fout = open(outfile, "a+")
|
| + try:
|
| + for cmd in cmds.split("\n"):
|
| + cmd = cmd.strip()
|
| + if not cmd:
|
| + continue
|
| + retcode, output = run_command(cmd)
|
| + print(output.rstrip())
|
| + if outfile:
|
| + fout.write(output)
|
| + if retcode:
|
| + raise Exception("command exited abnormally")
|
| + finally:
|
| + if outfile:
|
| + fout.close()
|
| +
|
| +
|
| +def runfunc(fn, rundir="src", addtopath=None):
|
| + """Run a function.
|
| +
|
| + `fn` is a callable.
|
| + `rundir` is the directory in which to run the function.
|
| +
|
| + """
|
| + with change_dir(rundir):
|
| + with saved_sys_path():
|
| + if addtopath is not None:
|
| + sys.path.insert(0, addtopath)
|
| + fn()
|
| +
|
| +
|
| +def compare(
|
| + dir1, dir2, file_pattern=None, size_within=0,
|
| + left_extra=False, right_extra=False, scrubs=None
|
| +):
|
| + """Compare files matching `file_pattern` in `dir1` and `dir2`.
|
| +
|
| + `dir2` is interpreted as a prefix, with Python version numbers appended
|
| + to find the actual directory to compare with. "foo" will compare
|
| + against "foo_v241", "foo_v24", "foo_v2", or "foo", depending on which
|
| + directory is found first.
|
| +
|
| + `size_within` is a percentage delta for the file sizes. If non-zero,
|
| + then the file contents are not compared (since they are expected to
|
| + often be different), but the file sizes must be within this amount.
|
| + For example, size_within=10 means that the two files' sizes must be
|
| + within 10 percent of each other to compare equal.
|
| +
|
| + `left_extra` true means the left directory can have extra files in it
|
| + without triggering an assertion. `right_extra` means the right
|
| + directory can.
|
| +
|
| + `scrubs` is a list of pairs, regexes to find and literal strings to
|
| + replace them with to scrub the files of unimportant differences.
|
| +
|
| + An assertion will be raised if the directories fail one of their
|
| + matches.
|
| +
|
| + """
|
| + # Search for a dir2 with a version suffix.
|
| + version_suff = ''.join(map(str, sys.version_info[:3]))
|
| + while version_suff:
|
| + trydir = dir2 + '_v' + version_suff
|
| + if os.path.exists(trydir):
|
| + dir2 = trydir
|
| + break
|
| + version_suff = version_suff[:-1]
|
| +
|
| + assert os.path.exists(dir1), "Left directory missing: %s" % dir1
|
| + assert os.path.exists(dir2), "Right directory missing: %s" % dir2
|
| +
|
| + dc = filecmp.dircmp(dir1, dir2)
|
| + diff_files = fnmatch_list(dc.diff_files, file_pattern)
|
| + left_only = fnmatch_list(dc.left_only, file_pattern)
|
| + right_only = fnmatch_list(dc.right_only, file_pattern)
|
| + show_diff = True
|
| +
|
| + if size_within:
|
| + # The files were already compared, use the diff_files list as a
|
| + # guide for size comparison.
|
| + wrong_size = []
|
| + for f in diff_files:
|
| + with open(os.path.join(dir1, f), "rb") as fobj:
|
| + left = fobj.read()
|
| + with open(os.path.join(dir2, f), "rb") as fobj:
|
| + right = fobj.read()
|
| + size_l, size_r = len(left), len(right)
|
| + big, little = max(size_l, size_r), min(size_l, size_r)
|
| + if (big - little) / float(little) > size_within/100.0:
|
| + # print "%d %d" % (big, little)
|
| + # print "Left: ---\n%s\n-----\n%s" % (left, right)
|
| + wrong_size.append("%s (%s,%s)" % (f, size_l, size_r))
|
| + if wrong_size:
|
| + print("File sizes differ between %s and %s: %s" % (
|
| + dir1, dir2, ", ".join(wrong_size)
|
| + ))
|
| +
|
| + # We'll show the diff iff the files differed enough in size.
|
| + show_diff = bool(wrong_size)
|
| +
|
| + if show_diff:
|
| + # filecmp only compares in binary mode, but we want text mode. So
|
| + # look through the list of different files, and compare them
|
| + # ourselves.
|
| + text_diff = []
|
| + for f in diff_files:
|
| + with open(os.path.join(dir1, f), READ_MODE) as fobj:
|
| + left = fobj.read()
|
| + with open(os.path.join(dir2, f), READ_MODE) as fobj:
|
| + right = fobj.read()
|
| + if scrubs:
|
| + left = scrub(left, scrubs)
|
| + right = scrub(right, scrubs)
|
| + if left != right:
|
| + text_diff.append(f)
|
| + left = left.splitlines()
|
| + right = right.splitlines()
|
| + print("\n".join(difflib.Differ().compare(left, right)))
|
| + assert not text_diff, "Files differ: %s" % text_diff
|
| +
|
| + if not left_extra:
|
| + assert not left_only, "Files in %s only: %s" % (dir1, left_only)
|
| + if not right_extra:
|
| + assert not right_only, "Files in %s only: %s" % (dir2, right_only)
|
| +
|
| +
|
| +def contains(filename, *strlist):
|
| + """Check that the file contains all of a list of strings.
|
| +
|
| + An assert will be raised if one of the arguments in `strlist` is
|
| + missing in `filename`.
|
| +
|
| + """
|
| + with open(filename, "r") as fobj:
|
| + text = fobj.read()
|
| + for s in strlist:
|
| + assert s in text, "Missing content in %s: %r" % (filename, s)
|
| +
|
| +
|
| +def contains_any(filename, *strlist):
|
| + """Check that the file contains at least one of a list of strings.
|
| +
|
| + An assert will be raised if none of the arguments in `strlist` is in
|
| + `filename`.
|
| +
|
| + """
|
| + with open(filename, "r") as fobj:
|
| + text = fobj.read()
|
| + for s in strlist:
|
| + if s in text:
|
| + return
|
| + assert False, "Missing content in %s: %r [1 of %d]" % (filename, strlist[0], len(strlist),)
|
| +
|
| +
|
| +def doesnt_contain(filename, *strlist):
|
| + """Check that the file contains none of a list of strings.
|
| +
|
| + An assert will be raised if any of the strings in `strlist` appears in
|
| + `filename`.
|
| +
|
| + """
|
| + with open(filename, "r") as fobj:
|
| + text = fobj.read()
|
| + for s in strlist:
|
| + assert s not in text, "Forbidden content in %s: %r" % (filename, s)
|
| +
|
| +
|
| +def clean(cleandir):
|
| + """Clean `cleandir` by removing it and all its children completely."""
|
| + # rmtree gives mysterious failures on Win7, so retry a "few" times.
|
| + # I've seen it take over 100 tries, so, 1000! This is probably the
|
| + # most unpleasant hack I've written in a long time...
|
| + tries = 1000
|
| + while tries: # pragma: part covered
|
| + if os.path.exists(cleandir):
|
| + try:
|
| + shutil.rmtree(cleandir)
|
| + except OSError: # pragma: not covered
|
| + if tries == 1:
|
| + raise
|
| + else:
|
| + tries -= 1
|
| + continue
|
| + break
|
| +
|
| +
|
| +def skip(msg=None):
|
| + """Skip the current test."""
|
| + raise SkipTest(msg)
|
| +
|
| +
|
| +# Helpers
|
| +
|
| +def fnmatch_list(files, file_pattern):
|
| + """Filter the list of `files` to only those that match `file_pattern`.
|
| +
|
| + If `file_pattern` is None, then return the entire list of files.
|
| +
|
| + Returns a list of the filtered files.
|
| +
|
| + """
|
| + if file_pattern:
|
| + files = [f for f in files if fnmatch.fnmatch(f, file_pattern)]
|
| + return files
|
| +
|
| +
|
| +def scrub(strdata, scrubs):
|
| + """Scrub uninteresting data from the payload in `strdata`.
|
| +
|
| + `scrubs` is a list of (find, replace) pairs of regexes that are used on
|
| + `strdata`. A string is returned.
|
| +
|
| + """
|
| + for rgx_find, rgx_replace in scrubs:
|
| + strdata = re.sub(rgx_find, re.escape(rgx_replace), strdata)
|
| + return strdata
|
| +
|
| +
|
| +def main(): # pragma: not covered
|
| + """Command-line access to test_farm.
|
| +
|
| + Commands:
|
| +
|
| + run testcase ... - Run specific test case(s)
|
| + out testcase ... - Run test cases, but don't clean up, leaving output.
|
| + clean - Clean all the output for all tests.
|
| +
|
| + """
|
| + try:
|
| + op = sys.argv[1]
|
| + except IndexError:
|
| + op = 'help'
|
| +
|
| + if op == 'run':
|
| + # Run the test for real.
|
| + for test_case in sys.argv[2:]:
|
| + case = FarmTestCase(test_case)
|
| + case.run_fully()
|
| + elif op == 'out':
|
| + # Run the test, but don't clean up, so we can examine the output.
|
| + for test_case in sys.argv[2:]:
|
| + case = FarmTestCase(test_case, dont_clean=True)
|
| + case.run_fully()
|
| + elif op == 'clean':
|
| + # Run all the tests, but just clean.
|
| + for test in test_farm(clean_only=True):
|
| + test[0].run_fully()
|
| + else:
|
| + print(main.__doc__)
|
| +
|
| +# So that we can run just one farm run.py at a time.
|
| +if __name__ == '__main__':
|
| + main()
|
|
|