| Index: tools/telemetry/third_party/coverage/tests/coveragetest.py
|
| diff --git a/tools/telemetry/third_party/coverage/tests/coveragetest.py b/tools/telemetry/third_party/coverage/tests/coveragetest.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..7db25de0b855df9f62afd2809e1fab468dfd8127
|
| --- /dev/null
|
| +++ b/tools/telemetry/third_party/coverage/tests/coveragetest.py
|
| @@ -0,0 +1,383 @@
|
| +# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
|
| +# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt
|
| +
|
| +"""Base test case class for coverage.py testing."""
|
| +
|
| +import datetime
|
| +import glob
|
| +import os
|
| +import random
|
| +import re
|
| +import shlex
|
| +import shutil
|
| +import sys
|
| +
|
| +import coverage
|
| +from coverage.backunittest import TestCase
|
| +from coverage.backward import StringIO, import_local_file, string_class
|
| +from coverage.cmdline import CoverageScript
|
| +from coverage.debug import _TEST_NAME_FILE, DebugControl
|
| +from coverage.test_helpers import (
|
| + EnvironmentAwareMixin, StdStreamCapturingMixin, TempDirMixin,
|
| +)
|
| +
|
| +from nose.plugins.skip import SkipTest
|
| +
|
| +from tests.helpers import run_command
|
| +
|
| +
|
| +# Status returns for the command line.
|
| +OK, ERR = 0, 1
|
| +
|
| +
|
| +class CoverageTest(
|
| + EnvironmentAwareMixin,
|
| + StdStreamCapturingMixin,
|
| + TempDirMixin,
|
| + TestCase
|
| +):
|
| + """A base class for coverage.py test cases."""
|
| +
|
| + # Standard unittest setting: show me diffs even if they are very long.
|
| + maxDiff = None
|
| +
|
| + # Tell newer unittest implementations to print long helpful messages.
|
| + longMessage = True
|
| +
|
| + def setUp(self):
|
| + super(CoverageTest, self).setUp()
|
| +
|
| + if _TEST_NAME_FILE: # pragma: debugging
|
| + with open(_TEST_NAME_FILE, "w") as f:
|
| + f.write("%s_%s" % (
|
| + self.__class__.__name__, self._testMethodName,
|
| + ))
|
| +
|
| + def skip(self, reason):
|
| + """Skip this test, and give a reason."""
|
| + self.class_behavior().skipped += 1
|
| + raise SkipTest(reason)
|
| +
|
| + def clean_local_file_imports(self):
|
| + """Clean up the results of calls to `import_local_file`.
|
| +
|
| + Use this if you need to `import_local_file` the same file twice in
|
| + one test.
|
| +
|
| + """
|
| + # So that we can re-import files, clean them out first.
|
| + self.cleanup_modules()
|
| + # Also have to clean out the .pyc file, since the timestamp
|
| + # resolution is only one second, a changed file might not be
|
| + # picked up.
|
| + for pyc in glob.glob('*.pyc'):
|
| + os.remove(pyc)
|
| + if os.path.exists("__pycache__"):
|
| + shutil.rmtree("__pycache__")
|
| +
|
| + def import_local_file(self, modname):
|
| + """Import a local file as a module.
|
| +
|
| + Opens a file in the current directory named `modname`.py, imports it
|
| + as `modname`, and returns the module object.
|
| +
|
| + """
|
| + return import_local_file(modname)
|
| +
|
| + def start_import_stop(self, cov, modname):
|
| + """Start coverage, import a file, then stop coverage.
|
| +
|
| + `cov` is started and stopped, with an `import_local_file` of
|
| + `modname` in the middle.
|
| +
|
| + The imported module is returned.
|
| +
|
| + """
|
| + cov.start()
|
| + try: # pragma: nested
|
| + # Import the Python file, executing it.
|
| + mod = self.import_local_file(modname)
|
| + finally: # pragma: nested
|
| + # Stop coverage.py.
|
| + cov.stop()
|
| + return mod
|
| +
|
| + def get_module_name(self):
|
| + """Return the module name to use for this test run."""
|
| + return 'coverage_test_' + str(random.random())[2:]
|
| +
|
| + # Map chars to numbers for arcz_to_arcs
|
| + _arcz_map = {'.': -1}
|
| + _arcz_map.update(dict((c, ord(c) - ord('0')) for c in '123456789'))
|
| + _arcz_map.update(dict(
|
| + (c, 10 + ord(c) - ord('A')) for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
|
| + ))
|
| +
|
| + def arcz_to_arcs(self, arcz):
|
| + """Convert a compact textual representation of arcs to a list of pairs.
|
| +
|
| + The text has space-separated pairs of letters. Period is -1, 1-9 are
|
| + 1-9, A-Z are 10 through 36. The resulting list is sorted regardless of
|
| + the order of the input pairs.
|
| +
|
| + ".1 12 2." --> [(-1,1), (1,2), (2,-1)]
|
| +
|
| + Minus signs can be included in the pairs:
|
| +
|
| + "-11, 12, 2-5" --> [(-1,1), (1,2), (2,-5)]
|
| +
|
| + """
|
| + arcs = []
|
| + for pair in arcz.split():
|
| + asgn = bsgn = 1
|
| + if len(pair) == 2:
|
| + a, b = pair
|
| + else:
|
| + assert len(pair) == 3
|
| + if pair[0] == '-':
|
| + _, a, b = pair
|
| + asgn = -1
|
| + else:
|
| + assert pair[1] == '-'
|
| + a, _, b = pair
|
| + bsgn = -1
|
| + arcs.append((asgn * self._arcz_map[a], bsgn * self._arcz_map[b]))
|
| + return sorted(arcs)
|
| +
|
| + def assert_equal_args(self, a1, a2, msg=None):
|
| + """Assert that the arc lists `a1` and `a2` are equal."""
|
| + # Make them into multi-line strings so we can see what's going wrong.
|
| + s1 = "\n".join(repr(a) for a in a1) + "\n"
|
| + s2 = "\n".join(repr(a) for a in a2) + "\n"
|
| + self.assertMultiLineEqual(s1, s2, msg)
|
| +
|
| + def check_coverage(
|
| + self, text, lines=None, missing="", report="",
|
| + excludes=None, partials="",
|
| + arcz=None, arcz_missing=None, arcz_unpredicted=None,
|
| + arcs=None, arcs_missing=None, arcs_unpredicted=None,
|
| + ):
|
| + """Check the coverage measurement of `text`.
|
| +
|
| + The source `text` is run and measured. `lines` are the line numbers
|
| + that are executable, or a list of possible line numbers, any of which
|
| + could match. `missing` are the lines not executed, `excludes` are
|
| + regexes to match against for excluding lines, and `report` is the text
|
| + of the measurement report.
|
| +
|
| + For arc measurement, `arcz` is a string that can be decoded into arcs
|
| + in the code (see `arcz_to_arcs` for the encoding scheme),
|
| + `arcz_missing` are the arcs that are not executed, and
|
| + `arcs_unpredicted` are the arcs executed in the code, but not deducible
|
| + from the code.
|
| +
|
| + Returns the Coverage object, in case you want to poke at it some more.
|
| +
|
| + """
|
| + # We write the code into a file so that we can import it.
|
| + # Coverage.py wants to deal with things as modules with file names.
|
| + modname = self.get_module_name()
|
| +
|
| + self.make_file(modname + ".py", text)
|
| +
|
| + if arcs is None and arcz is not None:
|
| + arcs = self.arcz_to_arcs(arcz)
|
| + if arcs_missing is None and arcz_missing is not None:
|
| + arcs_missing = self.arcz_to_arcs(arcz_missing)
|
| + if arcs_unpredicted is None and arcz_unpredicted is not None:
|
| + arcs_unpredicted = self.arcz_to_arcs(arcz_unpredicted)
|
| + branch = any(x is not None for x in [arcs, arcs_missing, arcs_unpredicted])
|
| +
|
| + # Start up coverage.py.
|
| + cov = coverage.Coverage(branch=branch)
|
| + cov.erase()
|
| + for exc in excludes or []:
|
| + cov.exclude(exc)
|
| + for par in partials or []:
|
| + cov.exclude(par, which='partial')
|
| +
|
| + mod = self.start_import_stop(cov, modname)
|
| +
|
| + # Clean up our side effects
|
| + del sys.modules[modname]
|
| +
|
| + # Get the analysis results, and check that they are right.
|
| + analysis = cov._analyze(mod)
|
| + statements = sorted(analysis.statements)
|
| + if lines is not None:
|
| + if isinstance(lines[0], int):
|
| + # lines is just a list of numbers, it must match the statements
|
| + # found in the code.
|
| + self.assertEqual(statements, lines)
|
| + else:
|
| + # lines is a list of possible line number lists, one of them
|
| + # must match.
|
| + for line_list in lines:
|
| + if statements == line_list:
|
| + break
|
| + else:
|
| + self.fail("None of the lines choices matched %r" % statements)
|
| +
|
| + missing_formatted = analysis.missing_formatted()
|
| + if isinstance(missing, string_class):
|
| + self.assertEqual(missing_formatted, missing)
|
| + else:
|
| + for missing_list in missing:
|
| + if missing_formatted == missing_list:
|
| + break
|
| + else:
|
| + self.fail("None of the missing choices matched %r" % missing_formatted)
|
| +
|
| + if arcs is not None:
|
| + self.assert_equal_args(analysis.arc_possibilities(), arcs, "Possible arcs differ")
|
| +
|
| + if arcs_missing is not None:
|
| + self.assert_equal_args(
|
| + analysis.arcs_missing(), arcs_missing,
|
| + "Missing arcs differ"
|
| + )
|
| +
|
| + if arcs_unpredicted is not None:
|
| + self.assert_equal_args(
|
| + analysis.arcs_unpredicted(), arcs_unpredicted,
|
| + "Unpredicted arcs differ"
|
| + )
|
| +
|
| + if report:
|
| + frep = StringIO()
|
| + cov.report(mod, file=frep)
|
| + rep = " ".join(frep.getvalue().split("\n")[2].split()[1:])
|
| + self.assertEqual(report, rep)
|
| +
|
| + return cov
|
| +
|
| + def nice_file(self, *fparts):
|
| + """Canonicalize the file name composed of the parts in `fparts`."""
|
| + fname = os.path.join(*fparts)
|
| + return os.path.normcase(os.path.abspath(os.path.realpath(fname)))
|
| +
|
| + def assert_same_files(self, flist1, flist2):
|
| + """Assert that `flist1` and `flist2` are the same set of file names."""
|
| + flist1_nice = [self.nice_file(f) for f in flist1]
|
| + flist2_nice = [self.nice_file(f) for f in flist2]
|
| + self.assertCountEqual(flist1_nice, flist2_nice)
|
| +
|
| + def assert_exists(self, fname):
|
| + """Assert that `fname` is a file that exists."""
|
| + msg = "File %r should exist" % fname
|
| + self.assertTrue(os.path.exists(fname), msg)
|
| +
|
| + def assert_doesnt_exist(self, fname):
|
| + """Assert that `fname` is a file that doesn't exist."""
|
| + msg = "File %r shouldn't exist" % fname
|
| + self.assertTrue(not os.path.exists(fname), msg)
|
| +
|
| + def assert_starts_with(self, s, prefix, msg=None):
|
| + """Assert that `s` starts with `prefix`."""
|
| + if not s.startswith(prefix):
|
| + self.fail(msg or ("%r doesn't start with %r" % (s, prefix)))
|
| +
|
| + def assert_recent_datetime(self, dt, seconds=10, msg=None):
|
| + """Assert that `dt` marks a time at most `seconds` seconds ago."""
|
| + age = datetime.datetime.now() - dt
|
| + # Python2.6 doesn't have total_seconds :(
|
| + self.assertEqual(age.days, 0, msg)
|
| + self.assertGreaterEqual(age.seconds, 0, msg)
|
| + self.assertLessEqual(age.seconds, seconds, msg)
|
| +
|
| + def command_line(self, args, ret=OK, _covpkg=None):
|
| + """Run `args` through the command line.
|
| +
|
| + Use this when you want to run the full coverage machinery, but in the
|
| + current process. Exceptions may be thrown from deep in the code.
|
| + Asserts that `ret` is returned by `CoverageScript.command_line`.
|
| +
|
| + Compare with `run_command`.
|
| +
|
| + Returns None.
|
| +
|
| + """
|
| + script = CoverageScript(_covpkg=_covpkg)
|
| + ret_actual = script.command_line(shlex.split(args))
|
| + self.assertEqual(ret_actual, ret)
|
| +
|
| + def run_command(self, cmd):
|
| + """Run the command-line `cmd` in a sub-process, and print its output.
|
| +
|
| + Use this when you need to test the process behavior of coverage.
|
| +
|
| + Compare with `command_line`.
|
| +
|
| + Returns the process' stdout text.
|
| +
|
| + """
|
| + # Running Python sub-processes can be tricky. Use the real name of our
|
| + # own executable. So "python foo.py" might get executed as
|
| + # "python3.3 foo.py". This is important because Python 3.x doesn't
|
| + # install as "python", so you might get a Python 2 executable instead
|
| + # if you don't use the executable's basename.
|
| + if cmd.startswith("python "):
|
| + cmd = os.path.basename(sys.executable) + cmd[6:]
|
| +
|
| + _, output = self.run_command_status(cmd)
|
| + return output
|
| +
|
| + def run_command_status(self, cmd):
|
| + """Run the command-line `cmd` in a sub-process, and print its output.
|
| +
|
| + Use this when you need to test the process behavior of coverage.
|
| +
|
| + Compare with `command_line`.
|
| +
|
| + Returns a pair: the process' exit status and stdout text.
|
| +
|
| + """
|
| + # Add our test modules directory to PYTHONPATH. I'm sure there's too
|
| + # much path munging here, but...
|
| + here = os.path.dirname(self.nice_file(coverage.__file__, ".."))
|
| + testmods = self.nice_file(here, 'tests/modules')
|
| + zipfile = self.nice_file(here, 'tests/zipmods.zip')
|
| + pypath = os.getenv('PYTHONPATH', '')
|
| + if pypath:
|
| + pypath += os.pathsep
|
| + pypath += testmods + os.pathsep + zipfile
|
| + self.set_environ('PYTHONPATH', pypath)
|
| +
|
| + status, output = run_command(cmd)
|
| + print(output)
|
| + return status, output
|
| +
|
| + def report_from_command(self, cmd):
|
| + """Return the report from the `cmd`, with some convenience added."""
|
| + report = self.run_command(cmd).replace('\\', '/')
|
| + self.assertNotIn("error", report.lower())
|
| + return report
|
| +
|
| + def report_lines(self, report):
|
| + """Return the lines of the report, as a list."""
|
| + lines = report.split('\n')
|
| + self.assertEqual(lines[-1], "")
|
| + return lines[:-1]
|
| +
|
| + def line_count(self, report):
|
| + """How many lines are in `report`?"""
|
| + return len(self.report_lines(report))
|
| +
|
| + def squeezed_lines(self, report):
|
| + """Return a list of the lines in report, with the spaces squeezed."""
|
| + lines = self.report_lines(report)
|
| + return [re.sub(r"\s+", " ", l.strip()) for l in lines]
|
| +
|
| + def last_line_squeezed(self, report):
|
| + """Return the last line of `report` with the spaces squeezed down."""
|
| + return self.squeezed_lines(report)[-1]
|
| +
|
| +
|
| +class DebugControlString(DebugControl):
|
| + """A `DebugControl` that writes to a StringIO, for testing."""
|
| + def __init__(self, options):
|
| + super(DebugControlString, self).__init__(options, StringIO())
|
| +
|
| + def get_output(self):
|
| + """Get the output text from the `DebugControl`."""
|
| + return self.output.getvalue()
|
|
|