Index: tools/telemetry/third_party/coverage/coverage/misc.py |
diff --git a/third_party/pycoverage/coverage/misc.py b/tools/telemetry/third_party/coverage/coverage/misc.py |
similarity index 54% |
copy from third_party/pycoverage/coverage/misc.py |
copy to tools/telemetry/third_party/coverage/coverage/misc.py |
index 0378173fcc3ddc92d00ca94d88d7ef3fec746a25..44f89772926396a82b86f4732b384fd26c0f5280 100644 |
--- a/third_party/pycoverage/coverage/misc.py |
+++ b/tools/telemetry/third_party/coverage/coverage/misc.py |
@@ -1,12 +1,38 @@ |
-"""Miscellaneous stuff for Coverage.""" |
+# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 |
+# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt |
+ |
+"""Miscellaneous stuff for coverage.py.""" |
import errno |
+import hashlib |
import inspect |
import os |
-import sys |
-from coverage.backward import md5, sorted # pylint: disable=W0622 |
-from coverage.backward import string_class, to_bytes |
+from coverage import env |
+from coverage.backward import string_class, to_bytes, unicode_class |
+ |
+ |
+# Use PyContracts for assertion testing on parameters and returns, but only if |
+# we are running our own test suite. |
+if env.TESTING: |
+ from contracts import contract # pylint: disable=unused-import |
+ from contracts import new_contract |
+ |
+ try: |
+ # Define contract words that PyContract doesn't have. |
+ new_contract('bytes', lambda v: isinstance(v, bytes)) |
+ if env.PY3: |
+ new_contract('unicode', lambda v: isinstance(v, unicode_class)) |
+ except ValueError: |
+ # During meta-coverage, this module is imported twice, and PyContracts |
+ # doesn't like redefining contracts. It's OK. |
+ pass |
+else: # pragma: not covered |
+ # We aren't using real PyContracts, so just define a no-op decorator as a |
+ # stunt double. |
+ def contract(**unused): |
+ """Dummy no-op implementation of `contract`.""" |
+ return lambda func: func |
def nice_pair(pair): |
@@ -42,7 +68,7 @@ def format_lines(statements, lines): |
lines = sorted(lines) |
while i < len(statements) and j < len(lines): |
if statements[i] == lines[j]: |
- if start == None: |
+ if start is None: |
start = lines[j] |
end = lines[j] |
j += 1 |
@@ -56,25 +82,25 @@ def format_lines(statements, lines): |
return ret |
-def short_stack(): |
- """Return a string summarizing the call stack.""" |
- stack = inspect.stack()[:0:-1] |
- return "\n".join(["%30s : %s @%d" % (t[3],t[1],t[2]) for t in stack]) |
- |
- |
def expensive(fn): |
- """A decorator to cache the result of an expensive operation. |
+ """A decorator to indicate that a method shouldn't be called more than once. |
- Only applies to methods with no arguments. |
+ Normally, this does nothing. During testing, this raises an exception if |
+ called more than once. |
""" |
- attr = "_cache_" + fn.__name__ |
- def _wrapped(self): |
- """Inner fn that checks the cache.""" |
- if not hasattr(self, attr): |
- setattr(self, attr, fn(self)) |
- return getattr(self, attr) |
- return _wrapped |
+ if env.TESTING: |
+ attr = "_once_" + fn.__name__ |
+ |
+ def _wrapped(self): |
+ """Inner function that checks the cache.""" |
+ if hasattr(self, attr): |
+ raise Exception("Shouldn't have called %s more than once" % fn.__name__) |
+ setattr(self, attr, True) |
+ return fn(self) |
+ return _wrapped |
+ else: |
+ return fn |
def bool_or_none(b): |
@@ -87,20 +113,14 @@ def bool_or_none(b): |
def join_regex(regexes): |
"""Combine a list of regexes into one that matches any of them.""" |
- if len(regexes) > 1: |
- return "|".join(["(%s)" % r for r in regexes]) |
- elif regexes: |
- return regexes[0] |
- else: |
- return "" |
+ return "|".join("(?:%s)" % r for r in regexes) |
def file_be_gone(path): |
"""Remove a file, and don't get annoyed if it doesn't exist.""" |
try: |
os.remove(path) |
- except OSError: |
- _, e, _ = sys.exc_info() |
+ except OSError as e: |
if e.errno != errno.ENOENT: |
raise |
@@ -108,13 +128,15 @@ def file_be_gone(path): |
class Hasher(object): |
"""Hashes Python data into md5.""" |
def __init__(self): |
- self.md5 = md5() |
+ self.md5 = hashlib.md5() |
def update(self, v): |
"""Add `v` to the hash, recursively if needed.""" |
self.md5.update(to_bytes(str(type(v)))) |
if isinstance(v, string_class): |
self.md5.update(to_bytes(v)) |
+ elif isinstance(v, bytes): |
+ self.md5.update(v) |
elif v is None: |
pass |
elif isinstance(v, (int, float)): |
@@ -137,27 +159,48 @@ class Hasher(object): |
self.update(k) |
self.update(a) |
- def digest(self): |
- """Retrieve the digest of the hash.""" |
- return self.md5.digest() |
+ def hexdigest(self): |
+ """Retrieve the hex digest of the hash.""" |
+ return self.md5.hexdigest() |
+ |
+ |
+def _needs_to_implement(that, func_name): |
+ """Helper to raise NotImplementedError in interface stubs.""" |
+ if hasattr(that, "_coverage_plugin_name"): |
+ thing = "Plugin" |
+ name = that._coverage_plugin_name |
+ else: |
+ thing = "Class" |
+ klass = that.__class__ |
+ name = "{klass.__module__}.{klass.__name__}".format(klass=klass) |
+ |
+ raise NotImplementedError( |
+ "{thing} {name!r} needs to implement {func_name}()".format( |
+ thing=thing, name=name, func_name=func_name |
+ ) |
+ ) |
class CoverageException(Exception): |
- """An exception specific to Coverage.""" |
+ """An exception specific to coverage.py.""" |
pass |
+ |
class NoSource(CoverageException): |
"""We couldn't find the source for a module.""" |
pass |
+ |
class NoCode(NoSource): |
"""We couldn't find any code at all.""" |
pass |
+ |
class NotPython(CoverageException): |
"""A source file turned out not to be parsable Python.""" |
pass |
+ |
class ExceptionDuringRun(CoverageException): |
"""An exception happened while running customer code. |