OLD | NEW |
(Empty) | |
| 1 from __future__ import print_function, absolute_import |
| 2 import os |
| 3 import tempfile |
| 4 import unittest |
| 5 import sys |
| 6 import re |
| 7 import warnings |
| 8 import io |
| 9 from textwrap import dedent |
| 10 |
| 11 from future.utils import bind_method, PY26, PY3, PY2, PY27 |
| 12 from future.moves.subprocess import check_output, STDOUT, CalledProcessError |
| 13 |
| 14 if PY26: |
| 15 import unittest2 as unittest |
| 16 |
| 17 |
| 18 def reformat_code(code): |
| 19 """ |
| 20 Removes any leading \n and dedents. |
| 21 """ |
| 22 if code.startswith('\n'): |
| 23 code = code[1:] |
| 24 return dedent(code) |
| 25 |
| 26 |
| 27 def order_future_lines(code): |
| 28 """ |
| 29 Returns the code block with any ``__future__`` import lines sorted, and |
| 30 then any ``future`` import lines sorted, then any ``builtins`` import lines |
| 31 sorted. |
| 32 |
| 33 This only sorts the lines within the expected blocks. |
| 34 |
| 35 See test_order_future_lines() for an example. |
| 36 """ |
| 37 |
| 38 # We need .splitlines(keepends=True), which doesn't exist on Py2, |
| 39 # so we use this instead: |
| 40 lines = code.split('\n') |
| 41 |
| 42 uufuture_line_numbers = [i for i, line in enumerate(lines) |
| 43 if line.startswith('from __future__ import ')] |
| 44 |
| 45 future_line_numbers = [i for i, line in enumerate(lines) |
| 46 if line.startswith('from future') |
| 47 or line.startswith('from past')] |
| 48 |
| 49 builtins_line_numbers = [i for i, line in enumerate(lines) |
| 50 if line.startswith('from builtins')] |
| 51 |
| 52 assert code.lstrip() == code, ('internal usage error: ' |
| 53 'dedent the code before calling order_future_lines()') |
| 54 |
| 55 def mymax(numbers): |
| 56 return max(numbers) if len(numbers) > 0 else 0 |
| 57 |
| 58 def mymin(numbers): |
| 59 return min(numbers) if len(numbers) > 0 else float('inf') |
| 60 |
| 61 assert mymax(uufuture_line_numbers) <= mymin(future_line_numbers), \ |
| 62 'the __future__ and future imports are out of order' |
| 63 |
| 64 # assert mymax(future_line_numbers) <= mymin(builtins_line_numbers), \ |
| 65 # 'the future and builtins imports are out of order' |
| 66 |
| 67 uul = sorted([lines[i] for i in uufuture_line_numbers]) |
| 68 sorted_uufuture_lines = dict(zip(uufuture_line_numbers, uul)) |
| 69 |
| 70 fl = sorted([lines[i] for i in future_line_numbers]) |
| 71 sorted_future_lines = dict(zip(future_line_numbers, fl)) |
| 72 |
| 73 bl = sorted([lines[i] for i in builtins_line_numbers]) |
| 74 sorted_builtins_lines = dict(zip(builtins_line_numbers, bl)) |
| 75 |
| 76 # Replace the old unsorted "from __future__ import ..." lines with the |
| 77 # new sorted ones: |
| 78 new_lines = [] |
| 79 for i in range(len(lines)): |
| 80 if i in uufuture_line_numbers: |
| 81 new_lines.append(sorted_uufuture_lines[i]) |
| 82 elif i in future_line_numbers: |
| 83 new_lines.append(sorted_future_lines[i]) |
| 84 elif i in builtins_line_numbers: |
| 85 new_lines.append(sorted_builtins_lines[i]) |
| 86 else: |
| 87 new_lines.append(lines[i]) |
| 88 return '\n'.join(new_lines) |
| 89 |
| 90 |
| 91 class VerboseCalledProcessError(CalledProcessError): |
| 92 """ |
| 93 Like CalledProcessError, but it displays more information (message and |
| 94 script output) for diagnosing test failures etc. |
| 95 """ |
| 96 def __init__(self, msg, returncode, cmd, output=None): |
| 97 self.msg = msg |
| 98 self.returncode = returncode |
| 99 self.cmd = cmd |
| 100 self.output = output |
| 101 |
| 102 def __str__(self): |
| 103 return ("Command '%s' failed with exit status %d\nMessage: %s\nOutput: %
s" |
| 104 % (self.cmd, self.returncode, self.msg, self.output)) |
| 105 |
| 106 class FuturizeError(VerboseCalledProcessError): |
| 107 pass |
| 108 |
| 109 class PasteurizeError(VerboseCalledProcessError): |
| 110 pass |
| 111 |
| 112 |
| 113 class CodeHandler(unittest.TestCase): |
| 114 """ |
| 115 Handy mixin for test classes for writing / reading / futurizing / |
| 116 running .py files in the test suite. |
| 117 """ |
| 118 def setUp(self): |
| 119 """ |
| 120 The outputs from the various futurize stages should have the |
| 121 following headers: |
| 122 """ |
| 123 # After stage1: |
| 124 # TODO: use this form after implementing a fixer to consolidate |
| 125 # __future__ imports into a single line: |
| 126 # self.headers1 = """ |
| 127 # from __future__ import absolute_import, division, print_function |
| 128 # """ |
| 129 self.headers1 = reformat_code(""" |
| 130 from __future__ import absolute_import |
| 131 from __future__ import division |
| 132 from __future__ import print_function |
| 133 """) |
| 134 |
| 135 # After stage2 --all-imports: |
| 136 # TODO: use this form after implementing a fixer to consolidate |
| 137 # __future__ imports into a single line: |
| 138 # self.headers2 = """ |
| 139 # from __future__ import (absolute_import, division, |
| 140 # print_function, unicode_literals) |
| 141 # from future import standard_library |
| 142 # from future.builtins import * |
| 143 # """ |
| 144 self.headers2 = reformat_code(""" |
| 145 from __future__ import absolute_import |
| 146 from __future__ import division |
| 147 from __future__ import print_function |
| 148 from __future__ import unicode_literals |
| 149 from future import standard_library |
| 150 standard_library.install_aliases() |
| 151 from builtins import * |
| 152 """) |
| 153 self.interpreters = [sys.executable] |
| 154 self.tempdir = tempfile.mkdtemp() + os.path.sep |
| 155 pypath = os.getenv('PYTHONPATH') |
| 156 if pypath: |
| 157 self.env = {'PYTHONPATH': os.getcwd() + os.pathsep + pypath} |
| 158 else: |
| 159 self.env = {'PYTHONPATH': os.getcwd()} |
| 160 |
| 161 def convert(self, code, stages=(1, 2), all_imports=False, from3=False, |
| 162 reformat=True, run=True, conservative=False): |
| 163 """ |
| 164 Converts the code block using ``futurize`` and returns the |
| 165 resulting code. |
| 166 |
| 167 Passing stages=[1] or stages=[2] passes the flag ``--stage1`` or |
| 168 ``stage2`` to ``futurize``. Passing both stages runs ``futurize`` |
| 169 with both stages by default. |
| 170 |
| 171 If from3 is False, runs ``futurize``, converting from Python 2 to |
| 172 both 2 and 3. If from3 is True, runs ``pasteurize`` to convert |
| 173 from Python 3 to both 2 and 3. |
| 174 |
| 175 Optionally reformats the code block first using the reformat() function. |
| 176 |
| 177 If run is True, runs the resulting code under all Python |
| 178 interpreters in self.interpreters. |
| 179 """ |
| 180 if reformat: |
| 181 code = reformat_code(code) |
| 182 self._write_test_script(code) |
| 183 self._futurize_test_script(stages=stages, all_imports=all_imports, |
| 184 from3=from3, conservative=conservative) |
| 185 output = self._read_test_script() |
| 186 if run: |
| 187 for interpreter in self.interpreters: |
| 188 _ = self._run_test_script(interpreter=interpreter) |
| 189 return output |
| 190 |
| 191 def compare(self, output, expected, ignore_imports=True): |
| 192 """ |
| 193 Compares whether the code blocks are equal. If not, raises an |
| 194 exception so the test fails. Ignores any trailing whitespace like |
| 195 blank lines. |
| 196 |
| 197 If ignore_imports is True, passes the code blocks into the |
| 198 strip_future_imports method. |
| 199 |
| 200 If one code block is a unicode string and the other a |
| 201 byte-string, it assumes the byte-string is encoded as utf-8. |
| 202 """ |
| 203 if ignore_imports: |
| 204 output = self.strip_future_imports(output) |
| 205 expected = self.strip_future_imports(expected) |
| 206 if isinstance(output, bytes) and not isinstance(expected, bytes): |
| 207 output = output.decode('utf-8') |
| 208 if isinstance(expected, bytes) and not isinstance(output, bytes): |
| 209 expected = expected.decode('utf-8') |
| 210 self.assertEqual(order_future_lines(output.rstrip()), |
| 211 expected.rstrip()) |
| 212 |
| 213 def strip_future_imports(self, code): |
| 214 """ |
| 215 Strips any of these import lines: |
| 216 |
| 217 from __future__ import <anything> |
| 218 from future <anything> |
| 219 from future.<anything> |
| 220 from builtins <anything> |
| 221 |
| 222 or any line containing: |
| 223 install_hooks() |
| 224 or: |
| 225 install_aliases() |
| 226 |
| 227 Limitation: doesn't handle imports split across multiple lines like |
| 228 this: |
| 229 |
| 230 from __future__ import (absolute_import, division, print_function, |
| 231 unicode_literals) |
| 232 """ |
| 233 output = [] |
| 234 # We need .splitlines(keepends=True), which doesn't exist on Py2, |
| 235 # so we use this instead: |
| 236 for line in code.split('\n'): |
| 237 if not (line.startswith('from __future__ import ') |
| 238 or line.startswith('from future ') |
| 239 or line.startswith('from builtins ') |
| 240 or 'install_hooks()' in line |
| 241 or 'install_aliases()' in line |
| 242 # but don't match "from future_builtins" :) |
| 243 or line.startswith('from future.')): |
| 244 output.append(line) |
| 245 return '\n'.join(output) |
| 246 |
| 247 def convert_check(self, before, expected, stages=(1, 2), all_imports=False, |
| 248 ignore_imports=True, from3=False, run=True, |
| 249 conservative=False): |
| 250 """ |
| 251 Convenience method that calls convert() and compare(). |
| 252 |
| 253 Reformats the code blocks automatically using the reformat_code() |
| 254 function. |
| 255 |
| 256 If all_imports is passed, we add the appropriate import headers |
| 257 for the stage(s) selected to the ``expected`` code-block, so they |
| 258 needn't appear repeatedly in the test code. |
| 259 |
| 260 If ignore_imports is True, ignores the presence of any lines |
| 261 beginning: |
| 262 |
| 263 from __future__ import ... |
| 264 from future import ... |
| 265 |
| 266 for the purpose of the comparison. |
| 267 """ |
| 268 output = self.convert(before, stages=stages, all_imports=all_imports, |
| 269 from3=from3, run=run, conservative=conservative) |
| 270 if all_imports: |
| 271 headers = self.headers2 if 2 in stages else self.headers1 |
| 272 else: |
| 273 headers = '' |
| 274 |
| 275 self.compare(output, headers + reformat_code(expected), |
| 276 ignore_imports=ignore_imports) |
| 277 |
| 278 def unchanged(self, code, **kwargs): |
| 279 """ |
| 280 Convenience method to ensure the code is unchanged by the |
| 281 futurize process. |
| 282 """ |
| 283 self.convert_check(code, code, **kwargs) |
| 284 |
| 285 def _write_test_script(self, code, filename='mytestscript.py'): |
| 286 """ |
| 287 Dedents the given code (a multiline string) and writes it out to |
| 288 a file in a temporary folder like /tmp/tmpUDCn7x/mytestscript.py. |
| 289 """ |
| 290 if isinstance(code, bytes): |
| 291 code = code.decode('utf-8') |
| 292 # Be explicit about encoding the temp file as UTF-8 (issue #63): |
| 293 with io.open(self.tempdir + filename, 'wt', encoding='utf-8') as f: |
| 294 f.write(dedent(code)) |
| 295 |
| 296 def _read_test_script(self, filename='mytestscript.py'): |
| 297 with io.open(self.tempdir + filename, 'rt', encoding='utf-8') as f: |
| 298 newsource = f.read() |
| 299 return newsource |
| 300 |
| 301 def _futurize_test_script(self, filename='mytestscript.py', stages=(1, 2), |
| 302 all_imports=False, from3=False, |
| 303 conservative=False): |
| 304 params = [] |
| 305 stages = list(stages) |
| 306 if all_imports: |
| 307 params.append('--all-imports') |
| 308 if from3: |
| 309 script = 'pasteurize.py' |
| 310 else: |
| 311 script = 'futurize.py' |
| 312 if stages == [1]: |
| 313 params.append('--stage1') |
| 314 elif stages == [2]: |
| 315 params.append('--stage2') |
| 316 else: |
| 317 assert stages == [1, 2] |
| 318 if conservative: |
| 319 params.append('--conservative') |
| 320 # No extra params needed |
| 321 |
| 322 # Absolute file path: |
| 323 fn = self.tempdir + filename |
| 324 call_args = [sys.executable, script] + params + ['-w', fn] |
| 325 try: |
| 326 output = check_output(call_args, stderr=STDOUT, env=self.env) |
| 327 except CalledProcessError as e: |
| 328 with open(fn) as f: |
| 329 msg = ( |
| 330 'Error running the command %s\n' |
| 331 '%s\n' |
| 332 'Contents of file %s:\n' |
| 333 '\n' |
| 334 '%s') % ( |
| 335 ' '.join(call_args), |
| 336 'env=%s' % self.env, |
| 337 fn, |
| 338 '----\n%s\n----' % f.read(), |
| 339 ) |
| 340 ErrorClass = (FuturizeError if 'futurize' in script else PasteurizeE
rror) |
| 341 raise ErrorClass(msg, e.returncode, e.cmd, output=e.output) |
| 342 return output |
| 343 |
| 344 def _run_test_script(self, filename='mytestscript.py', |
| 345 interpreter=sys.executable): |
| 346 # Absolute file path: |
| 347 fn = self.tempdir + filename |
| 348 try: |
| 349 output = check_output([interpreter, fn], |
| 350 env=self.env, stderr=STDOUT) |
| 351 except CalledProcessError as e: |
| 352 with open(fn) as f: |
| 353 msg = ( |
| 354 'Error running the command %s\n' |
| 355 '%s\n' |
| 356 'Contents of file %s:\n' |
| 357 '\n' |
| 358 '%s') % ( |
| 359 ' '.join([interpreter, fn]), |
| 360 'env=%s' % self.env, |
| 361 fn, |
| 362 '----\n%s\n----' % f.read(), |
| 363 ) |
| 364 if not hasattr(e, 'output'): |
| 365 # The attribute CalledProcessError.output doesn't exist on Py2.6 |
| 366 e.output = None |
| 367 raise VerboseCalledProcessError(msg, e.returncode, e.cmd, output=e.o
utput) |
| 368 return output |
| 369 |
| 370 |
| 371 # Decorator to skip some tests on Python 2.6 ... |
| 372 skip26 = unittest.skipIf(PY26, "this test is known to fail on Py2.6") |
| 373 |
| 374 |
| 375 def expectedFailurePY3(func): |
| 376 if not PY3: |
| 377 return func |
| 378 return unittest.expectedFailure(func) |
| 379 |
| 380 def expectedFailurePY26(func): |
| 381 if not PY26: |
| 382 return func |
| 383 return unittest.expectedFailure(func) |
| 384 |
| 385 |
| 386 def expectedFailurePY27(func): |
| 387 if not PY27: |
| 388 return func |
| 389 return unittest.expectedFailure(func) |
| 390 |
| 391 |
| 392 def expectedFailurePY2(func): |
| 393 if not PY2: |
| 394 return func |
| 395 return unittest.expectedFailure(func) |
| 396 |
| 397 |
| 398 # Renamed in Py3.3: |
| 399 if not hasattr(unittest.TestCase, 'assertRaisesRegex'): |
| 400 unittest.TestCase.assertRaisesRegex = unittest.TestCase.assertRaisesRegexp |
| 401 |
| 402 # From Py3.3: |
| 403 def assertRegex(self, text, expected_regex, msg=None): |
| 404 """Fail the test unless the text matches the regular expression.""" |
| 405 if isinstance(expected_regex, (str, unicode)): |
| 406 assert expected_regex, "expected_regex must not be empty." |
| 407 expected_regex = re.compile(expected_regex) |
| 408 if not expected_regex.search(text): |
| 409 msg = msg or "Regex didn't match" |
| 410 msg = '%s: %r not found in %r' % (msg, expected_regex.pattern, text) |
| 411 raise self.failureException(msg) |
| 412 |
| 413 if not hasattr(unittest.TestCase, 'assertRegex'): |
| 414 bind_method(unittest.TestCase, 'assertRegex', assertRegex) |
| 415 |
| 416 class _AssertRaisesBaseContext(object): |
| 417 |
| 418 def __init__(self, expected, test_case, callable_obj=None, |
| 419 expected_regex=None): |
| 420 self.expected = expected |
| 421 self.test_case = test_case |
| 422 if callable_obj is not None: |
| 423 try: |
| 424 self.obj_name = callable_obj.__name__ |
| 425 except AttributeError: |
| 426 self.obj_name = str(callable_obj) |
| 427 else: |
| 428 self.obj_name = None |
| 429 if isinstance(expected_regex, (bytes, str)): |
| 430 expected_regex = re.compile(expected_regex) |
| 431 self.expected_regex = expected_regex |
| 432 self.msg = None |
| 433 |
| 434 def _raiseFailure(self, standardMsg): |
| 435 msg = self.test_case._formatMessage(self.msg, standardMsg) |
| 436 raise self.test_case.failureException(msg) |
| 437 |
| 438 def handle(self, name, callable_obj, args, kwargs): |
| 439 """ |
| 440 If callable_obj is None, assertRaises/Warns is being used as a |
| 441 context manager, so check for a 'msg' kwarg and return self. |
| 442 If callable_obj is not None, call it passing args and kwargs. |
| 443 """ |
| 444 if callable_obj is None: |
| 445 self.msg = kwargs.pop('msg', None) |
| 446 return self |
| 447 with self: |
| 448 callable_obj(*args, **kwargs) |
| 449 |
| 450 class _AssertWarnsContext(_AssertRaisesBaseContext): |
| 451 """A context manager used to implement TestCase.assertWarns* methods.""" |
| 452 |
| 453 def __enter__(self): |
| 454 # The __warningregistry__'s need to be in a pristine state for tests |
| 455 # to work properly. |
| 456 for v in sys.modules.values(): |
| 457 if getattr(v, '__warningregistry__', None): |
| 458 v.__warningregistry__ = {} |
| 459 self.warnings_manager = warnings.catch_warnings(record=True) |
| 460 self.warnings = self.warnings_manager.__enter__() |
| 461 warnings.simplefilter("always", self.expected) |
| 462 return self |
| 463 |
| 464 def __exit__(self, exc_type, exc_value, tb): |
| 465 self.warnings_manager.__exit__(exc_type, exc_value, tb) |
| 466 if exc_type is not None: |
| 467 # let unexpected exceptions pass through |
| 468 return |
| 469 try: |
| 470 exc_name = self.expected.__name__ |
| 471 except AttributeError: |
| 472 exc_name = str(self.expected) |
| 473 first_matching = None |
| 474 for m in self.warnings: |
| 475 w = m.message |
| 476 if not isinstance(w, self.expected): |
| 477 continue |
| 478 if first_matching is None: |
| 479 first_matching = w |
| 480 if (self.expected_regex is not None and |
| 481 not self.expected_regex.search(str(w))): |
| 482 continue |
| 483 # store warning for later retrieval |
| 484 self.warning = w |
| 485 self.filename = m.filename |
| 486 self.lineno = m.lineno |
| 487 return |
| 488 # Now we simply try to choose a helpful failure message |
| 489 if first_matching is not None: |
| 490 self._raiseFailure('"{}" does not match "{}"'.format( |
| 491 self.expected_regex.pattern, str(first_matching))) |
| 492 if self.obj_name: |
| 493 self._raiseFailure("{} not triggered by {}".format(exc_name, |
| 494 self.obj_name)) |
| 495 else: |
| 496 self._raiseFailure("{} not triggered".format(exc_name)) |
| 497 |
| 498 |
| 499 def assertWarns(self, expected_warning, callable_obj=None, *args, **kwargs): |
| 500 """Fail unless a warning of class warnClass is triggered |
| 501 by callable_obj when invoked with arguments args and keyword |
| 502 arguments kwargs. If a different type of warning is |
| 503 triggered, it will not be handled: depending on the other |
| 504 warning filtering rules in effect, it might be silenced, printed |
| 505 out, or raised as an exception. |
| 506 |
| 507 If called with callable_obj omitted or None, will return a |
| 508 context object used like this:: |
| 509 |
| 510 with self.assertWarns(SomeWarning): |
| 511 do_something() |
| 512 |
| 513 An optional keyword argument 'msg' can be provided when assertWarns |
| 514 is used as a context object. |
| 515 |
| 516 The context manager keeps a reference to the first matching |
| 517 warning as the 'warning' attribute; similarly, the 'filename' |
| 518 and 'lineno' attributes give you information about the line |
| 519 of Python code from which the warning was triggered. |
| 520 This allows you to inspect the warning after the assertion:: |
| 521 |
| 522 with self.assertWarns(SomeWarning) as cm: |
| 523 do_something() |
| 524 the_warning = cm.warning |
| 525 self.assertEqual(the_warning.some_attribute, 147) |
| 526 """ |
| 527 context = _AssertWarnsContext(expected_warning, self, callable_obj) |
| 528 return context.handle('assertWarns', callable_obj, args, kwargs) |
| 529 |
| 530 if not hasattr(unittest.TestCase, 'assertWarns'): |
| 531 bind_method(unittest.TestCase, 'assertWarns', assertWarns) |
OLD | NEW |