| Index: third_party/google-endpoints/future/tests/base.py
|
| diff --git a/third_party/google-endpoints/future/tests/base.py b/third_party/google-endpoints/future/tests/base.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..546c779b097253aa60754ec8144b74fd19d6d90d
|
| --- /dev/null
|
| +++ b/third_party/google-endpoints/future/tests/base.py
|
| @@ -0,0 +1,531 @@
|
| +from __future__ import print_function, absolute_import
|
| +import os
|
| +import tempfile
|
| +import unittest
|
| +import sys
|
| +import re
|
| +import warnings
|
| +import io
|
| +from textwrap import dedent
|
| +
|
| +from future.utils import bind_method, PY26, PY3, PY2, PY27
|
| +from future.moves.subprocess import check_output, STDOUT, CalledProcessError
|
| +
|
| +if PY26:
|
| + import unittest2 as unittest
|
| +
|
| +
|
| +def reformat_code(code):
|
| + """
|
| + Removes any leading \n and dedents.
|
| + """
|
| + if code.startswith('\n'):
|
| + code = code[1:]
|
| + return dedent(code)
|
| +
|
| +
|
| +def order_future_lines(code):
|
| + """
|
| + Returns the code block with any ``__future__`` import lines sorted, and
|
| + then any ``future`` import lines sorted, then any ``builtins`` import lines
|
| + sorted.
|
| +
|
| + This only sorts the lines within the expected blocks.
|
| +
|
| + See test_order_future_lines() for an example.
|
| + """
|
| +
|
| + # We need .splitlines(keepends=True), which doesn't exist on Py2,
|
| + # so we use this instead:
|
| + lines = code.split('\n')
|
| +
|
| + uufuture_line_numbers = [i for i, line in enumerate(lines)
|
| + if line.startswith('from __future__ import ')]
|
| +
|
| + future_line_numbers = [i for i, line in enumerate(lines)
|
| + if line.startswith('from future')
|
| + or line.startswith('from past')]
|
| +
|
| + builtins_line_numbers = [i for i, line in enumerate(lines)
|
| + if line.startswith('from builtins')]
|
| +
|
| + assert code.lstrip() == code, ('internal usage error: '
|
| + 'dedent the code before calling order_future_lines()')
|
| +
|
| + def mymax(numbers):
|
| + return max(numbers) if len(numbers) > 0 else 0
|
| +
|
| + def mymin(numbers):
|
| + return min(numbers) if len(numbers) > 0 else float('inf')
|
| +
|
| + assert mymax(uufuture_line_numbers) <= mymin(future_line_numbers), \
|
| + 'the __future__ and future imports are out of order'
|
| +
|
| + # assert mymax(future_line_numbers) <= mymin(builtins_line_numbers), \
|
| + # 'the future and builtins imports are out of order'
|
| +
|
| + uul = sorted([lines[i] for i in uufuture_line_numbers])
|
| + sorted_uufuture_lines = dict(zip(uufuture_line_numbers, uul))
|
| +
|
| + fl = sorted([lines[i] for i in future_line_numbers])
|
| + sorted_future_lines = dict(zip(future_line_numbers, fl))
|
| +
|
| + bl = sorted([lines[i] for i in builtins_line_numbers])
|
| + sorted_builtins_lines = dict(zip(builtins_line_numbers, bl))
|
| +
|
| + # Replace the old unsorted "from __future__ import ..." lines with the
|
| + # new sorted ones:
|
| + new_lines = []
|
| + for i in range(len(lines)):
|
| + if i in uufuture_line_numbers:
|
| + new_lines.append(sorted_uufuture_lines[i])
|
| + elif i in future_line_numbers:
|
| + new_lines.append(sorted_future_lines[i])
|
| + elif i in builtins_line_numbers:
|
| + new_lines.append(sorted_builtins_lines[i])
|
| + else:
|
| + new_lines.append(lines[i])
|
| + return '\n'.join(new_lines)
|
| +
|
| +
|
| +class VerboseCalledProcessError(CalledProcessError):
|
| + """
|
| + Like CalledProcessError, but it displays more information (message and
|
| + script output) for diagnosing test failures etc.
|
| + """
|
| + def __init__(self, msg, returncode, cmd, output=None):
|
| + self.msg = msg
|
| + self.returncode = returncode
|
| + self.cmd = cmd
|
| + self.output = output
|
| +
|
| + def __str__(self):
|
| + return ("Command '%s' failed with exit status %d\nMessage: %s\nOutput: %s"
|
| + % (self.cmd, self.returncode, self.msg, self.output))
|
| +
|
| +class FuturizeError(VerboseCalledProcessError):
|
| + pass
|
| +
|
| +class PasteurizeError(VerboseCalledProcessError):
|
| + pass
|
| +
|
| +
|
| +class CodeHandler(unittest.TestCase):
|
| + """
|
| + Handy mixin for test classes for writing / reading / futurizing /
|
| + running .py files in the test suite.
|
| + """
|
| + def setUp(self):
|
| + """
|
| + The outputs from the various futurize stages should have the
|
| + following headers:
|
| + """
|
| + # After stage1:
|
| + # TODO: use this form after implementing a fixer to consolidate
|
| + # __future__ imports into a single line:
|
| + # self.headers1 = """
|
| + # from __future__ import absolute_import, division, print_function
|
| + # """
|
| + self.headers1 = reformat_code("""
|
| + from __future__ import absolute_import
|
| + from __future__ import division
|
| + from __future__ import print_function
|
| + """)
|
| +
|
| + # After stage2 --all-imports:
|
| + # TODO: use this form after implementing a fixer to consolidate
|
| + # __future__ imports into a single line:
|
| + # self.headers2 = """
|
| + # from __future__ import (absolute_import, division,
|
| + # print_function, unicode_literals)
|
| + # from future import standard_library
|
| + # from future.builtins import *
|
| + # """
|
| + self.headers2 = reformat_code("""
|
| + from __future__ import absolute_import
|
| + from __future__ import division
|
| + from __future__ import print_function
|
| + from __future__ import unicode_literals
|
| + from future import standard_library
|
| + standard_library.install_aliases()
|
| + from builtins import *
|
| + """)
|
| + self.interpreters = [sys.executable]
|
| + self.tempdir = tempfile.mkdtemp() + os.path.sep
|
| + pypath = os.getenv('PYTHONPATH')
|
| + if pypath:
|
| + self.env = {'PYTHONPATH': os.getcwd() + os.pathsep + pypath}
|
| + else:
|
| + self.env = {'PYTHONPATH': os.getcwd()}
|
| +
|
| + def convert(self, code, stages=(1, 2), all_imports=False, from3=False,
|
| + reformat=True, run=True, conservative=False):
|
| + """
|
| + Converts the code block using ``futurize`` and returns the
|
| + resulting code.
|
| +
|
| + Passing stages=[1] or stages=[2] passes the flag ``--stage1`` or
|
| + ``stage2`` to ``futurize``. Passing both stages runs ``futurize``
|
| + with both stages by default.
|
| +
|
| + If from3 is False, runs ``futurize``, converting from Python 2 to
|
| + both 2 and 3. If from3 is True, runs ``pasteurize`` to convert
|
| + from Python 3 to both 2 and 3.
|
| +
|
| + Optionally reformats the code block first using the reformat() function.
|
| +
|
| + If run is True, runs the resulting code under all Python
|
| + interpreters in self.interpreters.
|
| + """
|
| + if reformat:
|
| + code = reformat_code(code)
|
| + self._write_test_script(code)
|
| + self._futurize_test_script(stages=stages, all_imports=all_imports,
|
| + from3=from3, conservative=conservative)
|
| + output = self._read_test_script()
|
| + if run:
|
| + for interpreter in self.interpreters:
|
| + _ = self._run_test_script(interpreter=interpreter)
|
| + return output
|
| +
|
| + def compare(self, output, expected, ignore_imports=True):
|
| + """
|
| + Compares whether the code blocks are equal. If not, raises an
|
| + exception so the test fails. Ignores any trailing whitespace like
|
| + blank lines.
|
| +
|
| + If ignore_imports is True, passes the code blocks into the
|
| + strip_future_imports method.
|
| +
|
| + If one code block is a unicode string and the other a
|
| + byte-string, it assumes the byte-string is encoded as utf-8.
|
| + """
|
| + if ignore_imports:
|
| + output = self.strip_future_imports(output)
|
| + expected = self.strip_future_imports(expected)
|
| + if isinstance(output, bytes) and not isinstance(expected, bytes):
|
| + output = output.decode('utf-8')
|
| + if isinstance(expected, bytes) and not isinstance(output, bytes):
|
| + expected = expected.decode('utf-8')
|
| + self.assertEqual(order_future_lines(output.rstrip()),
|
| + expected.rstrip())
|
| +
|
| + def strip_future_imports(self, code):
|
| + """
|
| + Strips any of these import lines:
|
| +
|
| + from __future__ import <anything>
|
| + from future <anything>
|
| + from future.<anything>
|
| + from builtins <anything>
|
| +
|
| + or any line containing:
|
| + install_hooks()
|
| + or:
|
| + install_aliases()
|
| +
|
| + Limitation: doesn't handle imports split across multiple lines like
|
| + this:
|
| +
|
| + from __future__ import (absolute_import, division, print_function,
|
| + unicode_literals)
|
| + """
|
| + output = []
|
| + # We need .splitlines(keepends=True), which doesn't exist on Py2,
|
| + # so we use this instead:
|
| + for line in code.split('\n'):
|
| + if not (line.startswith('from __future__ import ')
|
| + or line.startswith('from future ')
|
| + or line.startswith('from builtins ')
|
| + or 'install_hooks()' in line
|
| + or 'install_aliases()' in line
|
| + # but don't match "from future_builtins" :)
|
| + or line.startswith('from future.')):
|
| + output.append(line)
|
| + return '\n'.join(output)
|
| +
|
| + def convert_check(self, before, expected, stages=(1, 2), all_imports=False,
|
| + ignore_imports=True, from3=False, run=True,
|
| + conservative=False):
|
| + """
|
| + Convenience method that calls convert() and compare().
|
| +
|
| + Reformats the code blocks automatically using the reformat_code()
|
| + function.
|
| +
|
| + If all_imports is passed, we add the appropriate import headers
|
| + for the stage(s) selected to the ``expected`` code-block, so they
|
| + needn't appear repeatedly in the test code.
|
| +
|
| + If ignore_imports is True, ignores the presence of any lines
|
| + beginning:
|
| +
|
| + from __future__ import ...
|
| + from future import ...
|
| +
|
| + for the purpose of the comparison.
|
| + """
|
| + output = self.convert(before, stages=stages, all_imports=all_imports,
|
| + from3=from3, run=run, conservative=conservative)
|
| + if all_imports:
|
| + headers = self.headers2 if 2 in stages else self.headers1
|
| + else:
|
| + headers = ''
|
| +
|
| + self.compare(output, headers + reformat_code(expected),
|
| + ignore_imports=ignore_imports)
|
| +
|
| + def unchanged(self, code, **kwargs):
|
| + """
|
| + Convenience method to ensure the code is unchanged by the
|
| + futurize process.
|
| + """
|
| + self.convert_check(code, code, **kwargs)
|
| +
|
| + def _write_test_script(self, code, filename='mytestscript.py'):
|
| + """
|
| + Dedents the given code (a multiline string) and writes it out to
|
| + a file in a temporary folder like /tmp/tmpUDCn7x/mytestscript.py.
|
| + """
|
| + if isinstance(code, bytes):
|
| + code = code.decode('utf-8')
|
| + # Be explicit about encoding the temp file as UTF-8 (issue #63):
|
| + with io.open(self.tempdir + filename, 'wt', encoding='utf-8') as f:
|
| + f.write(dedent(code))
|
| +
|
| + def _read_test_script(self, filename='mytestscript.py'):
|
| + with io.open(self.tempdir + filename, 'rt', encoding='utf-8') as f:
|
| + newsource = f.read()
|
| + return newsource
|
| +
|
| + def _futurize_test_script(self, filename='mytestscript.py', stages=(1, 2),
|
| + all_imports=False, from3=False,
|
| + conservative=False):
|
| + params = []
|
| + stages = list(stages)
|
| + if all_imports:
|
| + params.append('--all-imports')
|
| + if from3:
|
| + script = 'pasteurize.py'
|
| + else:
|
| + script = 'futurize.py'
|
| + if stages == [1]:
|
| + params.append('--stage1')
|
| + elif stages == [2]:
|
| + params.append('--stage2')
|
| + else:
|
| + assert stages == [1, 2]
|
| + if conservative:
|
| + params.append('--conservative')
|
| + # No extra params needed
|
| +
|
| + # Absolute file path:
|
| + fn = self.tempdir + filename
|
| + call_args = [sys.executable, script] + params + ['-w', fn]
|
| + try:
|
| + output = check_output(call_args, stderr=STDOUT, env=self.env)
|
| + except CalledProcessError as e:
|
| + with open(fn) as f:
|
| + msg = (
|
| + 'Error running the command %s\n'
|
| + '%s\n'
|
| + 'Contents of file %s:\n'
|
| + '\n'
|
| + '%s') % (
|
| + ' '.join(call_args),
|
| + 'env=%s' % self.env,
|
| + fn,
|
| + '----\n%s\n----' % f.read(),
|
| + )
|
| + ErrorClass = (FuturizeError if 'futurize' in script else PasteurizeError)
|
| + raise ErrorClass(msg, e.returncode, e.cmd, output=e.output)
|
| + return output
|
| +
|
| + def _run_test_script(self, filename='mytestscript.py',
|
| + interpreter=sys.executable):
|
| + # Absolute file path:
|
| + fn = self.tempdir + filename
|
| + try:
|
| + output = check_output([interpreter, fn],
|
| + env=self.env, stderr=STDOUT)
|
| + except CalledProcessError as e:
|
| + with open(fn) as f:
|
| + msg = (
|
| + 'Error running the command %s\n'
|
| + '%s\n'
|
| + 'Contents of file %s:\n'
|
| + '\n'
|
| + '%s') % (
|
| + ' '.join([interpreter, fn]),
|
| + 'env=%s' % self.env,
|
| + fn,
|
| + '----\n%s\n----' % f.read(),
|
| + )
|
| + if not hasattr(e, 'output'):
|
| + # The attribute CalledProcessError.output doesn't exist on Py2.6
|
| + e.output = None
|
| + raise VerboseCalledProcessError(msg, e.returncode, e.cmd, output=e.output)
|
| + return output
|
| +
|
| +
|
| +# Decorator to skip some tests on Python 2.6 ...
|
| +skip26 = unittest.skipIf(PY26, "this test is known to fail on Py2.6")
|
| +
|
| +
|
| +def expectedFailurePY3(func):
|
| + if not PY3:
|
| + return func
|
| + return unittest.expectedFailure(func)
|
| +
|
| +def expectedFailurePY26(func):
|
| + if not PY26:
|
| + return func
|
| + return unittest.expectedFailure(func)
|
| +
|
| +
|
| +def expectedFailurePY27(func):
|
| + if not PY27:
|
| + return func
|
| + return unittest.expectedFailure(func)
|
| +
|
| +
|
| +def expectedFailurePY2(func):
|
| + if not PY2:
|
| + return func
|
| + return unittest.expectedFailure(func)
|
| +
|
| +
|
| +# Renamed in Py3.3:
|
| +if not hasattr(unittest.TestCase, 'assertRaisesRegex'):
|
| + unittest.TestCase.assertRaisesRegex = unittest.TestCase.assertRaisesRegexp
|
| +
|
| +# From Py3.3:
|
| +def assertRegex(self, text, expected_regex, msg=None):
|
| + """Fail the test unless the text matches the regular expression."""
|
| + if isinstance(expected_regex, (str, unicode)):
|
| + assert expected_regex, "expected_regex must not be empty."
|
| + expected_regex = re.compile(expected_regex)
|
| + if not expected_regex.search(text):
|
| + msg = msg or "Regex didn't match"
|
| + msg = '%s: %r not found in %r' % (msg, expected_regex.pattern, text)
|
| + raise self.failureException(msg)
|
| +
|
| +if not hasattr(unittest.TestCase, 'assertRegex'):
|
| + bind_method(unittest.TestCase, 'assertRegex', assertRegex)
|
| +
|
| +class _AssertRaisesBaseContext(object):
|
| +
|
| + def __init__(self, expected, test_case, callable_obj=None,
|
| + expected_regex=None):
|
| + self.expected = expected
|
| + self.test_case = test_case
|
| + if callable_obj is not None:
|
| + try:
|
| + self.obj_name = callable_obj.__name__
|
| + except AttributeError:
|
| + self.obj_name = str(callable_obj)
|
| + else:
|
| + self.obj_name = None
|
| + if isinstance(expected_regex, (bytes, str)):
|
| + expected_regex = re.compile(expected_regex)
|
| + self.expected_regex = expected_regex
|
| + self.msg = None
|
| +
|
| + def _raiseFailure(self, standardMsg):
|
| + msg = self.test_case._formatMessage(self.msg, standardMsg)
|
| + raise self.test_case.failureException(msg)
|
| +
|
| + def handle(self, name, callable_obj, args, kwargs):
|
| + """
|
| + If callable_obj is None, assertRaises/Warns is being used as a
|
| + context manager, so check for a 'msg' kwarg and return self.
|
| + If callable_obj is not None, call it passing args and kwargs.
|
| + """
|
| + if callable_obj is None:
|
| + self.msg = kwargs.pop('msg', None)
|
| + return self
|
| + with self:
|
| + callable_obj(*args, **kwargs)
|
| +
|
| +class _AssertWarnsContext(_AssertRaisesBaseContext):
|
| + """A context manager used to implement TestCase.assertWarns* methods."""
|
| +
|
| + def __enter__(self):
|
| + # The __warningregistry__'s need to be in a pristine state for tests
|
| + # to work properly.
|
| + for v in sys.modules.values():
|
| + if getattr(v, '__warningregistry__', None):
|
| + v.__warningregistry__ = {}
|
| + self.warnings_manager = warnings.catch_warnings(record=True)
|
| + self.warnings = self.warnings_manager.__enter__()
|
| + warnings.simplefilter("always", self.expected)
|
| + return self
|
| +
|
| + def __exit__(self, exc_type, exc_value, tb):
|
| + self.warnings_manager.__exit__(exc_type, exc_value, tb)
|
| + if exc_type is not None:
|
| + # let unexpected exceptions pass through
|
| + return
|
| + try:
|
| + exc_name = self.expected.__name__
|
| + except AttributeError:
|
| + exc_name = str(self.expected)
|
| + first_matching = None
|
| + for m in self.warnings:
|
| + w = m.message
|
| + if not isinstance(w, self.expected):
|
| + continue
|
| + if first_matching is None:
|
| + first_matching = w
|
| + if (self.expected_regex is not None and
|
| + not self.expected_regex.search(str(w))):
|
| + continue
|
| + # store warning for later retrieval
|
| + self.warning = w
|
| + self.filename = m.filename
|
| + self.lineno = m.lineno
|
| + return
|
| + # Now we simply try to choose a helpful failure message
|
| + if first_matching is not None:
|
| + self._raiseFailure('"{}" does not match "{}"'.format(
|
| + self.expected_regex.pattern, str(first_matching)))
|
| + if self.obj_name:
|
| + self._raiseFailure("{} not triggered by {}".format(exc_name,
|
| + self.obj_name))
|
| + else:
|
| + self._raiseFailure("{} not triggered".format(exc_name))
|
| +
|
| +
|
| +def assertWarns(self, expected_warning, callable_obj=None, *args, **kwargs):
|
| + """Fail unless a warning of class warnClass is triggered
|
| + by callable_obj when invoked with arguments args and keyword
|
| + arguments kwargs. If a different type of warning is
|
| + triggered, it will not be handled: depending on the other
|
| + warning filtering rules in effect, it might be silenced, printed
|
| + out, or raised as an exception.
|
| +
|
| + If called with callable_obj omitted or None, will return a
|
| + context object used like this::
|
| +
|
| + with self.assertWarns(SomeWarning):
|
| + do_something()
|
| +
|
| + An optional keyword argument 'msg' can be provided when assertWarns
|
| + is used as a context object.
|
| +
|
| + The context manager keeps a reference to the first matching
|
| + warning as the 'warning' attribute; similarly, the 'filename'
|
| + and 'lineno' attributes give you information about the line
|
| + of Python code from which the warning was triggered.
|
| + This allows you to inspect the warning after the assertion::
|
| +
|
| + with self.assertWarns(SomeWarning) as cm:
|
| + do_something()
|
| + the_warning = cm.warning
|
| + self.assertEqual(the_warning.some_attribute, 147)
|
| + """
|
| + context = _AssertWarnsContext(expected_warning, self, callable_obj)
|
| + return context.handle('assertWarns', callable_obj, args, kwargs)
|
| +
|
| +if not hasattr(unittest.TestCase, 'assertWarns'):
|
| + bind_method(unittest.TestCase, 'assertWarns', assertWarns)
|
|
|