Index: third_party/twisted_8_1/twisted/trial/unittest.py |
diff --git a/third_party/twisted_8_1/twisted/trial/unittest.py b/third_party/twisted_8_1/twisted/trial/unittest.py |
deleted file mode 100644 |
index a95218c23ac73cc510932c0b397267393ae41f87..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/trial/unittest.py |
+++ /dev/null |
@@ -1,1454 +0,0 @@ |
-# -*- test-case-name: twisted.trial.test.test_tests -*- |
-# |
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-""" |
-Things likely to be used by writers of unit tests. |
- |
-Maintainer: Jonathan Lange <jml@twistedmatrix.com> |
-""" |
- |
- |
-import doctest |
-import os, warnings, sys, tempfile, gc |
-from pprint import pformat |
- |
-from twisted.internet import defer, utils |
-from twisted.python import components, failure, log, monkey |
-from twisted.python.compat import set |
-from twisted.python import deprecate |
-from twisted.python.deprecate import getDeprecationWarningString |
- |
-from twisted.trial import itrial, reporter, util |
- |
-pyunit = __import__('unittest') |
- |
-from zope.interface import implements |
- |
- |
- |
-class SkipTest(Exception): |
- """ |
- Raise this (with a reason) to skip the current test. You may also set |
- method.skip to a reason string to skip it, or set class.skip to skip the |
- entire TestCase. |
- """ |
- |
- |
-class FailTest(AssertionError): |
- """Raised to indicate the current test has failed to pass.""" |
- |
- |
-class Todo(object): |
- """ |
- Internal object used to mark a L{TestCase} as 'todo'. Tests marked 'todo' |
- are reported differently in Trial L{TestResult}s. If todo'd tests fail, |
- they do not fail the suite and the errors are reported in a separate |
- category. If todo'd tests succeed, Trial L{TestResult}s will report an |
- unexpected success. |
- """ |
- |
- def __init__(self, reason, errors=None): |
- """ |
- @param reason: A string explaining why the test is marked 'todo' |
- |
- @param errors: An iterable of exception types that the test is |
- expected to raise. If one of these errors is raised by the test, it |
- will be trapped. Raising any other kind of error will fail the test. |
- If C{None} is passed, then all errors will be trapped. |
- """ |
- self.reason = reason |
- self.errors = errors |
- |
- def __repr__(self): |
- return "<Todo reason=%r errors=%r>" % (self.reason, self.errors) |
- |
- def expected(self, failure): |
- """ |
- @param failure: A L{twisted.python.failure.Failure}. |
- |
- @return: C{True} if C{failure} is expected, C{False} otherwise. |
- """ |
- if self.errors is None: |
- return True |
- for error in self.errors: |
- if failure.check(error): |
- return True |
- return False |
- |
- |
-def makeTodo(value): |
- """ |
- Return a L{Todo} object built from C{value}. |
- |
- If C{value} is a string, return a Todo that expects any exception with |
- C{value} as a reason. If C{value} is a tuple, the second element is used |
- as the reason and the first element as the excepted error(s). |
- |
- @param value: A string or a tuple of C{(errors, reason)}, where C{errors} |
- is either a single exception class or an iterable of exception classes. |
- |
- @return: A L{Todo} object. |
- """ |
- if isinstance(value, str): |
- return Todo(reason=value) |
- if isinstance(value, tuple): |
- errors, reason = value |
- try: |
- errors = list(errors) |
- except TypeError: |
- errors = [errors] |
- return Todo(reason=reason, errors=errors) |
- |
- |
-class _Assertions(pyunit.TestCase, object): |
- """ |
- Replaces many of the built-in TestCase assertions. In general, these |
- assertions provide better error messages and are easier to use in |
- callbacks. Also provides new assertions such as L{failUnlessFailure}. |
- |
- Although the tests are defined as 'failIf*' and 'failUnless*', they can |
- also be called as 'assertNot*' and 'assert*'. |
- """ |
- |
- def fail(self, msg=None): |
- """absolutely fails the test, do not pass go, do not collect $200 |
- |
- @param msg: the message that will be displayed as the reason for the |
- failure |
- """ |
- raise self.failureException(msg) |
- |
- def failIf(self, condition, msg=None): |
- """fails the test if C{condition} evaluates to False |
- |
- @param condition: any object that defines __nonzero__ |
- """ |
- if condition: |
- raise self.failureException(msg) |
- return condition |
- assertNot = assertFalse = failUnlessFalse = failIf |
- |
- def failUnless(self, condition, msg=None): |
- """fails the test if C{condition} evaluates to True |
- |
- @param condition: any object that defines __nonzero__ |
- """ |
- if not condition: |
- raise self.failureException(msg) |
- return condition |
- assert_ = assertTrue = failUnlessTrue = failUnless |
- |
- def failUnlessRaises(self, exception, f, *args, **kwargs): |
- """fails the test unless calling the function C{f} with the given C{args} |
- and C{kwargs} raises C{exception}. The failure will report the |
- traceback and call stack of the unexpected exception. |
- |
- @param exception: exception type that is to be expected |
- @param f: the function to call |
- |
- @return: The raised exception instance, if it is of the given type. |
- @raise self.failureException: Raised if the function call does not raise an exception |
- or if it raises an exception of a different type. |
- """ |
- try: |
- result = f(*args, **kwargs) |
- except exception, inst: |
- return inst |
- except: |
- raise self.failureException('%s raised instead of %s:\n %s' |
- % (sys.exc_info()[0], |
- exception.__name__, |
- failure.Failure().getTraceback())) |
- else: |
- raise self.failureException('%s not raised (%r returned)' |
- % (exception.__name__, result)) |
- assertRaises = failUnlessRaises |
- |
- def failUnlessEqual(self, first, second, msg=''): |
- """ |
- Fail the test if C{first} and C{second} are not equal. |
- |
- @param msg: A string describing the failure that's included in the |
- exception. |
- """ |
- if not first == second: |
- if msg is None: |
- msg = '' |
- if len(msg) > 0: |
- msg += '\n' |
- raise self.failureException( |
- '%snot equal:\na = %s\nb = %s\n' |
- % (msg, pformat(first), pformat(second))) |
- return first |
- assertEqual = assertEquals = failUnlessEquals = failUnlessEqual |
- |
- def failUnlessIdentical(self, first, second, msg=None): |
- """fail the test if C{first} is not C{second}. This is an |
- obect-identity-equality test, not an object equality (i.e. C{__eq__}) test |
- |
- @param msg: if msg is None, then the failure message will be |
- '%r is not %r' % (first, second) |
- """ |
- if first is not second: |
- raise self.failureException(msg or '%r is not %r' % (first, second)) |
- return first |
- assertIdentical = failUnlessIdentical |
- |
- def failIfIdentical(self, first, second, msg=None): |
- """fail the test if C{first} is C{second}. This is an |
- obect-identity-equality test, not an object equality (i.e. C{__eq__}) test |
- |
- @param msg: if msg is None, then the failure message will be |
- '%r is %r' % (first, second) |
- """ |
- if first is second: |
- raise self.failureException(msg or '%r is %r' % (first, second)) |
- return first |
- assertNotIdentical = failIfIdentical |
- |
- def failIfEqual(self, first, second, msg=None): |
- """fail the test if C{first} == C{second} |
- |
- @param msg: if msg is None, then the failure message will be |
- '%r == %r' % (first, second) |
- """ |
- if not first != second: |
- raise self.failureException(msg or '%r == %r' % (first, second)) |
- return first |
- assertNotEqual = assertNotEquals = failIfEquals = failIfEqual |
- |
- def failUnlessIn(self, containee, container, msg=None): |
- """fail the test if C{containee} is not found in C{container} |
- |
- @param containee: the value that should be in C{container} |
- @param container: a sequence type, or in the case of a mapping type, |
- will follow semantics of 'if key in dict.keys()' |
- @param msg: if msg is None, then the failure message will be |
- '%r not in %r' % (first, second) |
- """ |
- if containee not in container: |
- raise self.failureException(msg or "%r not in %r" |
- % (containee, container)) |
- return containee |
- assertIn = failUnlessIn |
- |
- def failIfIn(self, containee, container, msg=None): |
- """fail the test if C{containee} is found in C{container} |
- |
- @param containee: the value that should not be in C{container} |
- @param container: a sequence type, or in the case of a mapping type, |
- will follow semantics of 'if key in dict.keys()' |
- @param msg: if msg is None, then the failure message will be |
- '%r in %r' % (first, second) |
- """ |
- if containee in container: |
- raise self.failureException(msg or "%r in %r" |
- % (containee, container)) |
- return containee |
- assertNotIn = failIfIn |
- |
- def failIfAlmostEqual(self, first, second, places=7, msg=None): |
- """Fail if the two objects are equal as determined by their |
- difference rounded to the given number of decimal places |
- (default 7) and comparing to zero. |
- |
- @note: decimal places (from zero) is usually not the same |
- as significant digits (measured from the most |
- signficant digit). |
- |
- @note: included for compatiblity with PyUnit test cases |
- """ |
- if round(second-first, places) == 0: |
- raise self.failureException(msg or '%r == %r within %r places' |
- % (first, second, places)) |
- return first |
- assertNotAlmostEqual = assertNotAlmostEquals = failIfAlmostEqual |
- failIfAlmostEquals = failIfAlmostEqual |
- |
- def failUnlessAlmostEqual(self, first, second, places=7, msg=None): |
- """Fail if the two objects are unequal as determined by their |
- difference rounded to the given number of decimal places |
- (default 7) and comparing to zero. |
- |
- @note: decimal places (from zero) is usually not the same |
- as significant digits (measured from the most |
- signficant digit). |
- |
- @note: included for compatiblity with PyUnit test cases |
- """ |
- if round(second-first, places) != 0: |
- raise self.failureException(msg or '%r != %r within %r places' |
- % (first, second, places)) |
- return first |
- assertAlmostEqual = assertAlmostEquals = failUnlessAlmostEqual |
- failUnlessAlmostEquals = failUnlessAlmostEqual |
- |
- def failUnlessApproximates(self, first, second, tolerance, msg=None): |
- """asserts that C{first} - C{second} > C{tolerance} |
- |
- @param msg: if msg is None, then the failure message will be |
- '%r ~== %r' % (first, second) |
- """ |
- if abs(first - second) > tolerance: |
- raise self.failureException(msg or "%s ~== %s" % (first, second)) |
- return first |
- assertApproximates = failUnlessApproximates |
- |
- def failUnlessFailure(self, deferred, *expectedFailures): |
- """Assert that C{deferred} will errback with one of |
- C{expectedFailures}. Returns the original Deferred with callbacks |
- added. You will need to return this Deferred from your test case. |
- """ |
- def _cb(ignore): |
- raise self.failureException( |
- "did not catch an error, instead got %r" % (ignore,)) |
- |
- def _eb(failure): |
- if failure.check(*expectedFailures): |
- return failure.value |
- else: |
- output = ('\nExpected: %r\nGot:\n%s' |
- % (expectedFailures, str(failure))) |
- raise self.failureException(output) |
- return deferred.addCallbacks(_cb, _eb) |
- assertFailure = failUnlessFailure |
- |
- def failUnlessSubstring(self, substring, astring, msg=None): |
- return self.failUnlessIn(substring, astring, msg) |
- assertSubstring = failUnlessSubstring |
- |
- def failIfSubstring(self, substring, astring, msg=None): |
- return self.failIfIn(substring, astring, msg) |
- assertNotSubstring = failIfSubstring |
- |
- def failUnlessWarns(self, category, message, filename, f, |
- *args, **kwargs): |
- """ |
- Fail if the given function doesn't generate the specified warning when |
- called. It calls the function, checks the warning, and forwards the |
- result of the function if everything is fine. |
- |
- @param category: the category of the warning to check. |
- @param message: the output message of the warning to check. |
- @param filename: the filename where the warning should come from. |
- @param f: the function which is supposed to generate the warning. |
- @type f: any callable. |
- @param args: the arguments to C{f}. |
- @param kwargs: the keywords arguments to C{f}. |
- |
- @return: the result of the original function C{f}. |
- """ |
- warningsShown = [] |
- def warnExplicit(*args): |
- warningsShown.append(args) |
- |
- origExplicit = warnings.warn_explicit |
- try: |
- warnings.warn_explicit = warnExplicit |
- result = f(*args, **kwargs) |
- finally: |
- warnings.warn_explicit = origExplicit |
- |
- if not warningsShown: |
- self.fail("No warnings emitted") |
- first = warningsShown[0] |
- for other in warningsShown[1:]: |
- if other[:2] != first[:2]: |
- self.fail("Can't handle different warnings") |
- gotMessage, gotCategory, gotFilename, lineno = first[:4] |
- self.assertEqual(gotMessage, message) |
- self.assertIdentical(gotCategory, category) |
- |
- # Use starts with because of .pyc/.pyo issues. |
- self.failUnless( |
- filename.startswith(gotFilename), |
- 'Warning in %r, expected %r' % (gotFilename, filename)) |
- |
- # It would be nice to be able to check the line number as well, but |
- # different configurations actually end up reporting different line |
- # numbers (generally the variation is only 1 line, but that's enough |
- # to fail the test erroneously...). |
- # self.assertEqual(lineno, xxx) |
- |
- return result |
- assertWarns = failUnlessWarns |
- |
- def failUnlessIsInstance(self, instance, classOrTuple): |
- """ |
- Assert that the given instance is of the given class or of one of the |
- given classes. |
- |
- @param instance: the object to test the type (first argument of the |
- C{isinstance} call). |
- @type instance: any. |
- @param classOrTuple: the class or classes to test against (second |
- argument of the C{isinstance} call). |
- @type classOrTuple: class, type, or tuple. |
- """ |
- if not isinstance(instance, classOrTuple): |
- self.fail("%r is not an instance of %s" % (instance, classOrTuple)) |
- |
- assertIsInstance = failUnlessIsInstance |
- |
- def failIfIsInstance(self, instance, classOrTuple): |
- """ |
- Assert that the given instance is not of the given class or of one of |
- the given classes. |
- |
- @param instance: the object to test the type (first argument of the |
- C{isinstance} call). |
- @type instance: any. |
- @param classOrTuple: the class or classes to test against (second |
- argument of the C{isinstance} call). |
- @type classOrTuple: class, type, or tuple. |
- """ |
- if isinstance(instance, classOrTuple): |
- self.fail("%r is not an instance of %s" % (instance, classOrTuple)) |
- |
- assertNotIsInstance = failIfIsInstance |
- |
- |
-class _LogObserver(object): |
- """ |
- Observes the Twisted logs and catches any errors. |
- """ |
- |
- def __init__(self): |
- self._errors = [] |
- self._added = 0 |
- self._ignored = [] |
- |
- def _add(self): |
- if self._added == 0: |
- log.addObserver(self.gotEvent) |
- self._oldFE, log._flushErrors = (log._flushErrors, self.flushErrors) |
- self._oldIE, log._ignore = (log._ignore, self._ignoreErrors) |
- self._oldCI, log._clearIgnores = (log._clearIgnores, |
- self._clearIgnores) |
- self._added += 1 |
- |
- def _remove(self): |
- self._added -= 1 |
- if self._added == 0: |
- log.removeObserver(self.gotEvent) |
- log._flushErrors = self._oldFE |
- log._ignore = self._oldIE |
- log._clearIgnores = self._oldCI |
- |
- def _ignoreErrors(self, *errorTypes): |
- """ |
- Do not store any errors with any of the given types. |
- """ |
- self._ignored.extend(errorTypes) |
- |
- def _clearIgnores(self): |
- """ |
- Stop ignoring any errors we might currently be ignoring. |
- """ |
- self._ignored = [] |
- |
- def flushErrors(self, *errorTypes): |
- """ |
- Flush errors from the list of caught errors. If no arguments are |
- specified, remove all errors. If arguments are specified, only remove |
- errors of those types from the stored list. |
- """ |
- if errorTypes: |
- flushed = [] |
- remainder = [] |
- for f in self._errors: |
- if f.check(*errorTypes): |
- flushed.append(f) |
- else: |
- remainder.append(f) |
- self._errors = remainder |
- else: |
- flushed = self._errors |
- self._errors = [] |
- return flushed |
- |
- def getErrors(self): |
- """ |
- Return a list of errors caught by this observer. |
- """ |
- return self._errors |
- |
- def gotEvent(self, event): |
- """ |
- The actual observer method. Called whenever a message is logged. |
- |
- @param event: A dictionary containing the log message. Actual |
- structure undocumented (see source for L{twisted.python.log}). |
- """ |
- if event.get('isError', False) and 'failure' in event: |
- f = event['failure'] |
- if len(self._ignored) == 0 or not f.check(*self._ignored): |
- self._errors.append(f) |
- |
- |
-_logObserver = _LogObserver() |
- |
-_wait_is_running = [] |
- |
- |
-class TestCase(_Assertions): |
- """ |
- A unit test. The atom of the unit testing universe. |
- |
- This class extends C{unittest.TestCase} from the standard library. The |
- main feature is the ability to return C{Deferred}s from tests and fixture |
- methods and to have the suite wait for those C{Deferred}s to fire. |
- |
- To write a unit test, subclass C{TestCase} and define a method (say, |
- 'test_foo') on the subclass. To run the test, instantiate your subclass |
- with the name of the method, and call L{run} on the instance, passing a |
- L{TestResult} object. |
- |
- The C{trial} script will automatically find any C{TestCase} subclasses |
- defined in modules beginning with 'test_' and construct test cases for all |
- methods beginning with 'test'. |
- |
- If an error is logged during the test run, the test will fail with an |
- error. See L{log.err}. |
- |
- @ivar failureException: An exception class, defaulting to C{FailTest}. If |
- the test method raises this exception, it will be reported as a failure, |
- rather than an exception. All of the assertion methods raise this if the |
- assertion fails. |
- |
- @ivar skip: C{None} or a string explaining why this test is to be |
- skipped. If defined, the test will not be run. Instead, it will be |
- reported to the result object as 'skipped' (if the C{TestResult} supports |
- skipping). |
- |
- @ivar suppress: C{None} or a list of tuples of C{(args, kwargs)} to be |
- passed to C{warnings.filterwarnings}. Use these to suppress warnings |
- raised in a test. Useful for testing deprecated code. See also |
- L{util.suppress}. |
- |
- @ivar timeout: C{None} or a real number of seconds. If set, the test will |
- raise an error if it takes longer than C{timeout} seconds. |
- |
- @ivar todo: C{None}, a string or a tuple of C{(errors, reason)} where |
- C{errors} is either an exception class or an iterable of exception |
- classes, and C{reason} is a string. See L{Todo} or L{makeTodo} for more |
- information. |
- """ |
- |
- implements(itrial.ITestCase) |
- failureException = FailTest |
- |
- def __init__(self, methodName='runTest'): |
- """ |
- Construct an asynchronous test case for C{methodName}. |
- |
- @param methodName: The name of a method on C{self}. This method should |
- be a unit test. That is, it should be a short method that calls some of |
- the assert* methods. If C{methodName} is unspecified, L{runTest} will |
- be used as the test method. This is mostly useful for testing Trial. |
- """ |
- super(TestCase, self).__init__(methodName) |
- self._testMethodName = methodName |
- testMethod = getattr(self, methodName) |
- self._parents = [testMethod, self] |
- self._parents.extend(util.getPythonContainers(testMethod)) |
- self._shared = (hasattr(self, 'setUpClass') or |
- hasattr(self, 'tearDownClass')) |
- if self._shared: |
- self._prepareClassFixture() |
- if not hasattr(self.__class__, '_instances'): |
- self._initInstances() |
- self.__class__._instances.add(self) |
- self._passed = False |
- self._cleanups = [] |
- |
- def _initInstances(cls): |
- cls._instances = set() |
- cls._instancesRun = set() |
- _initInstances = classmethod(_initInstances) |
- |
- def _isFirst(self): |
- return len(self.__class__._instancesRun) == 0 |
- |
- def _isLast(self): |
- return self.__class__._instancesRun == self.__class__._instances |
- |
- def _prepareClassFixture(self): |
- """Lots of tests assume that test methods all run in the same instance |
- of TestCase. This isn't true. Calling this method ensures that |
- self.__class__._testCaseInstance contains an instance of this class |
- that will remain the same for all tests from this class. |
- """ |
- if not hasattr(self.__class__, '_testCaseInstance'): |
- self.__class__._testCaseInstance = self |
- if self.__class__._testCaseInstance.__class__ != self.__class__: |
- self.__class__._testCaseInstance = self |
- |
- def _run(self, methodName, result): |
- from twisted.internet import reactor |
- timeout = self.getTimeout() |
- def onTimeout(d): |
- e = defer.TimeoutError("%r (%s) still running at %s secs" |
- % (self, methodName, timeout)) |
- f = failure.Failure(e) |
- # try to errback the deferred that the test returns (for no gorram |
- # reason) (see issue1005 and test_errorPropagation in |
- # test_deferred) |
- try: |
- d.errback(f) |
- except defer.AlreadyCalledError: |
- # if the deferred has been called already but the *back chain |
- # is still unfinished, crash the reactor and report timeout |
- # error ourself. |
- reactor.crash() |
- self._timedOut = True # see self._wait |
- todo = self.getTodo() |
- if todo is not None and todo.expected(f): |
- result.addExpectedFailure(self, f, todo) |
- else: |
- result.addError(self, f) |
- onTimeout = utils.suppressWarnings( |
- onTimeout, util.suppress(category=DeprecationWarning)) |
- if self._shared: |
- test = self.__class__._testCaseInstance |
- else: |
- test = self |
- method = getattr(test, methodName) |
- d = defer.maybeDeferred(utils.runWithWarningsSuppressed, |
- self.getSuppress(), method) |
- call = reactor.callLater(timeout, onTimeout, d) |
- d.addBoth(lambda x : call.active() and call.cancel() or x) |
- return d |
- |
- def shortDescription(self): |
- desc = super(TestCase, self).shortDescription() |
- if desc is None: |
- return self._testMethodName |
- return desc |
- |
- def __call__(self, *args, **kwargs): |
- return self.run(*args, **kwargs) |
- |
- def deferSetUpClass(self, result): |
- if not hasattr(self, 'setUpClass'): |
- d = defer.succeed(None) |
- d.addCallback(self.deferSetUp, result) |
- return d |
- d = self._run('setUpClass', result) |
- d.addCallbacks(self.deferSetUp, self._ebDeferSetUpClass, |
- callbackArgs=(result,), |
- errbackArgs=(result,)) |
- return d |
- |
- def _ebDeferSetUpClass(self, error, result): |
- if error.check(SkipTest): |
- result.addSkip(self, self._getReason(error)) |
- self.__class__._instancesRun.remove(self) |
- elif error.check(KeyboardInterrupt): |
- result.stop() |
- else: |
- result.addError(self, error) |
- self.__class__._instancesRun.remove(self) |
- |
- def deferSetUp(self, ignored, result): |
- d = self._run('setUp', result) |
- d.addCallbacks(self.deferTestMethod, self._ebDeferSetUp, |
- callbackArgs=(result,), |
- errbackArgs=(result,)) |
- return d |
- |
- def _ebDeferSetUp(self, failure, result): |
- if failure.check(SkipTest): |
- result.addSkip(self, self._getReason(failure)) |
- else: |
- result.addError(self, failure) |
- if failure.check(KeyboardInterrupt): |
- result.stop() |
- return self.deferRunCleanups(None, result) |
- |
- def deferTestMethod(self, ignored, result): |
- d = self._run(self._testMethodName, result) |
- d.addCallbacks(self._cbDeferTestMethod, self._ebDeferTestMethod, |
- callbackArgs=(result,), |
- errbackArgs=(result,)) |
- d.addBoth(self.deferRunCleanups, result) |
- d.addBoth(self.deferTearDown, result) |
- if self._shared and hasattr(self, 'tearDownClass') and self._isLast(): |
- d.addBoth(self.deferTearDownClass, result) |
- return d |
- |
- def _cbDeferTestMethod(self, ignored, result): |
- if self.getTodo() is not None: |
- result.addUnexpectedSuccess(self, self.getTodo()) |
- else: |
- self._passed = True |
- return ignored |
- |
- def _ebDeferTestMethod(self, f, result): |
- todo = self.getTodo() |
- if todo is not None and todo.expected(f): |
- result.addExpectedFailure(self, f, todo) |
- elif f.check(self.failureException, FailTest): |
- result.addFailure(self, f) |
- elif f.check(KeyboardInterrupt): |
- result.addError(self, f) |
- result.stop() |
- elif f.check(SkipTest): |
- result.addSkip(self, self._getReason(f)) |
- else: |
- result.addError(self, f) |
- |
- def deferTearDown(self, ignored, result): |
- d = self._run('tearDown', result) |
- d.addErrback(self._ebDeferTearDown, result) |
- return d |
- |
- def _ebDeferTearDown(self, failure, result): |
- result.addError(self, failure) |
- if failure.check(KeyboardInterrupt): |
- result.stop() |
- self._passed = False |
- |
- def deferRunCleanups(self, ignored, result): |
- """ |
- Run any scheduled cleanups and report errors (if any to the result |
- object. |
- """ |
- d = self._runCleanups() |
- d.addCallback(self._cbDeferRunCleanups, result) |
- return d |
- |
- def _cbDeferRunCleanups(self, cleanupResults, result): |
- for flag, failure in cleanupResults: |
- if flag == defer.FAILURE: |
- result.addError(self, failure) |
- if failure.check(KeyboardInterrupt): |
- result.stop() |
- self._passed = False |
- |
- def deferTearDownClass(self, ignored, result): |
- d = self._run('tearDownClass', result) |
- d.addErrback(self._ebTearDownClass, result) |
- return d |
- |
- def _ebTearDownClass(self, error, result): |
- if error.check(KeyboardInterrupt): |
- result.stop() |
- result.addError(self, error) |
- |
- def _cleanUp(self, result): |
- try: |
- clean = util._Janitor(self, result).postCaseCleanup() |
- if not clean: |
- self._passed = False |
- except: |
- result.addError(self, failure.Failure()) |
- self._passed = False |
- for error in self._observer.getErrors(): |
- result.addError(self, error) |
- self._passed = False |
- self.flushLoggedErrors() |
- self._removeObserver() |
- if self._passed: |
- result.addSuccess(self) |
- |
- def _classCleanUp(self, result): |
- try: |
- util._Janitor(self, result).postClassCleanup() |
- except: |
- result.addError(self, failure.Failure()) |
- |
- def _makeReactorMethod(self, name): |
- """ |
- Create a method which wraps the reactor method C{name}. The new |
- method issues a deprecation warning and calls the original. |
- """ |
- def _(*a, **kw): |
- warnings.warn("reactor.%s cannot be used inside unit tests. " |
- "In the future, using %s will fail the test and may " |
- "crash or hang the test run." |
- % (name, name), |
- stacklevel=2, category=DeprecationWarning) |
- return self._reactorMethods[name](*a, **kw) |
- return _ |
- |
- def _deprecateReactor(self, reactor): |
- """ |
- Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is, |
- each method is wrapped in a function that issues a deprecation |
- warning, then calls the original. |
- |
- @param reactor: The Twisted reactor. |
- """ |
- self._reactorMethods = {} |
- for name in ['crash', 'iterate', 'stop']: |
- self._reactorMethods[name] = getattr(reactor, name) |
- setattr(reactor, name, self._makeReactorMethod(name)) |
- |
- def _undeprecateReactor(self, reactor): |
- """ |
- Restore the deprecated reactor methods. Undoes what |
- L{_deprecateReactor} did. |
- |
- @param reactor: The Twisted reactor. |
- """ |
- for name, method in self._reactorMethods.iteritems(): |
- setattr(reactor, name, method) |
- self._reactorMethods = {} |
- |
- def _installObserver(self): |
- self._observer = _logObserver |
- self._observer._add() |
- |
- def _removeObserver(self): |
- self._observer._remove() |
- |
- def flushLoggedErrors(self, *errorTypes): |
- """ |
- Remove stored errors received from the log. |
- |
- C{TestCase} stores each error logged during the run of the test and |
- reports them as errors during the cleanup phase (after C{tearDown}). |
- |
- @param *errorTypes: If unspecifed, flush all errors. Otherwise, only |
- flush errors that match the given types. |
- |
- @return: A list of failures that have been removed. |
- """ |
- return self._observer.flushErrors(*errorTypes) |
- |
- |
- def addCleanup(self, f, *args, **kwargs): |
- """ |
- Add the given function to a list of functions to be called after the |
- test has run, but before C{tearDown}. |
- |
- Functions will be run in reverse order of being added. This helps |
- ensure that tear down complements set up. |
- |
- The function C{f} may return a Deferred. If so, C{TestCase} will wait |
- until the Deferred has fired before proceeding to the next function. |
- """ |
- self._cleanups.append((f, args, kwargs)) |
- |
- |
- def _captureDeprecationWarnings(self, f, *args, **kwargs): |
- """ |
- Call C{f} and capture all deprecation warnings. |
- """ |
- warnings = [] |
- def accumulateDeprecations(message, category, stacklevel): |
- self.assertEqual(DeprecationWarning, category) |
- self.assertEqual(stacklevel, 2) |
- warnings.append(message) |
- |
- originalMethod = deprecate.getWarningMethod() |
- deprecate.setWarningMethod(accumulateDeprecations) |
- try: |
- result = f(*args, **kwargs) |
- finally: |
- deprecate.setWarningMethod(originalMethod) |
- return (warnings, result) |
- |
- |
- def callDeprecated(self, version, f, *args, **kwargs): |
- """ |
- Call a function that was deprecated at a specific version. |
- |
- @param version: The version that the function was deprecated in. |
- @param f: The deprecated function to call. |
- @return: Whatever the function returns. |
- """ |
- warnings, result = self._captureDeprecationWarnings( |
- f, *args, **kwargs) |
- |
- if len(warnings) == 0: |
- self.fail('%r is not deprecated.' % (f,)) |
- |
- observedWarning = warnings[0] |
- expectedWarning = getDeprecationWarningString(f, version) |
- self.assertEqual(expectedWarning, observedWarning) |
- |
- return result |
- |
- |
- def _runCleanups(self): |
- """ |
- Run the cleanups added with L{addCleanup} in order. |
- |
- @return: A C{Deferred} that fires when all cleanups are run. |
- """ |
- def _makeFunction(f, args, kwargs): |
- return lambda: f(*args, **kwargs) |
- callables = [] |
- while len(self._cleanups) > 0: |
- f, args, kwargs = self._cleanups.pop() |
- callables.append(_makeFunction(f, args, kwargs)) |
- return util._runSequentially(callables) |
- |
- |
- def patch(self, obj, attribute, value): |
- """ |
- Monkey patch an object for the duration of the test. |
- |
- The monkey patch will be reverted at the end of the test using the |
- L{addCleanup} mechanism. |
- |
- The L{MonkeyPatcher} is returned so that users can restore and |
- re-apply the monkey patch within their tests. |
- |
- @param obj: The object to monkey patch. |
- @param attribute: The name of the attribute to change. |
- @param value: The value to set the attribute to. |
- @return: A L{monkey.MonkeyPatcher} object. |
- """ |
- monkeyPatch = monkey.MonkeyPatcher((obj, attribute, value)) |
- monkeyPatch.patch() |
- self.addCleanup(monkeyPatch.restore) |
- return monkeyPatch |
- |
- |
- def runTest(self): |
- """ |
- If no C{methodName} argument is passed to the constructor, L{run} will |
- treat this method as the thing with the actual test inside. |
- """ |
- |
- |
- def run(self, result): |
- """ |
- Run the test case, storing the results in C{result}. |
- |
- First runs C{setUp} on self, then runs the test method (defined in the |
- constructor), then runs C{tearDown}. Any of these may return |
- L{Deferred}s. After they complete, does some reactor cleanup. |
- |
- @param result: A L{TestResult} object. |
- """ |
- log.msg("--> %s <--" % (self.id())) |
- from twisted.internet import reactor |
- new_result = itrial.IReporter(result, None) |
- if new_result is None: |
- result = PyUnitResultAdapter(result) |
- else: |
- result = new_result |
- self._timedOut = False |
- if self._shared and self not in self.__class__._instances: |
- self.__class__._instances.add(self) |
- result.startTest(self) |
- if self.getSkip(): # don't run test methods that are marked as .skip |
- result.addSkip(self, self.getSkip()) |
- result.stopTest(self) |
- return |
- self._installObserver() |
- self._passed = False |
- first = False |
- if self._shared: |
- first = self._isFirst() |
- self.__class__._instancesRun.add(self) |
- self._deprecateReactor(reactor) |
- try: |
- if first: |
- d = self.deferSetUpClass(result) |
- else: |
- d = self.deferSetUp(None, result) |
- try: |
- self._wait(d) |
- finally: |
- self._cleanUp(result) |
- result.stopTest(self) |
- if self._shared and self._isLast(): |
- self._initInstances() |
- self._classCleanUp(result) |
- if not self._shared: |
- self._classCleanUp(result) |
- finally: |
- self._undeprecateReactor(reactor) |
- |
- def _getReason(self, f): |
- if len(f.value.args) > 0: |
- reason = f.value.args[0] |
- else: |
- warnings.warn(("Do not raise unittest.SkipTest with no " |
- "arguments! Give a reason for skipping tests!"), |
- stacklevel=2) |
- reason = f |
- return reason |
- |
- def getSkip(self): |
- """ |
- Return the skip reason set on this test, if any is set. Checks on the |
- instance first, then the class, then the module, then packages. As |
- soon as it finds something with a C{skip} attribute, returns that. |
- Returns C{None} if it cannot find anything. See L{TestCase} docstring |
- for more details. |
- """ |
- return util.acquireAttribute(self._parents, 'skip', None) |
- |
- def getTodo(self): |
- """ |
- Return a L{Todo} object if the test is marked todo. Checks on the |
- instance first, then the class, then the module, then packages. As |
- soon as it finds something with a C{todo} attribute, returns that. |
- Returns C{None} if it cannot find anything. See L{TestCase} docstring |
- for more details. |
- """ |
- todo = util.acquireAttribute(self._parents, 'todo', None) |
- if todo is None: |
- return None |
- return makeTodo(todo) |
- |
- def getTimeout(self): |
- """ |
- Returns the timeout value set on this test. Checks on the instance |
- first, then the class, then the module, then packages. As soon as it |
- finds something with a C{timeout} attribute, returns that. Returns |
- L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See |
- L{TestCase} docstring for more details. |
- """ |
- timeout = util.acquireAttribute(self._parents, 'timeout', |
- util.DEFAULT_TIMEOUT_DURATION) |
- try: |
- return float(timeout) |
- except (ValueError, TypeError): |
- # XXX -- this is here because sometimes people will have methods |
- # called 'timeout', or set timeout to 'orange', or something |
- # Particularly, test_news.NewsTestCase and ReactorCoreTestCase |
- # both do this. |
- warnings.warn("'timeout' attribute needs to be a number.", |
- category=DeprecationWarning) |
- return util.DEFAULT_TIMEOUT_DURATION |
- |
- def getSuppress(self): |
- """ |
- Returns any warning suppressions set for this test. Checks on the |
- instance first, then the class, then the module, then packages. As |
- soon as it finds something with a C{suppress} attribute, returns that. |
- Returns any empty list (i.e. suppress no warnings) if it cannot find |
- anything. See L{TestCase} docstring for more details. |
- """ |
- return util.acquireAttribute(self._parents, 'suppress', []) |
- |
- |
- def visit(self, visitor): |
- """ |
- Visit this test case. Call C{visitor} with C{self} as a parameter. |
- |
- Deprecated in Twisted 8.0. |
- |
- @param visitor: A callable which expects a single parameter: a test |
- case. |
- |
- @return: None |
- """ |
- warnings.warn("Test visitors deprecated in Twisted 8.0", |
- category=DeprecationWarning) |
- visitor(self) |
- |
- |
- def mktemp(self): |
- """Returns a unique name that may be used as either a temporary |
- directory or filename. |
- |
- @note: you must call os.mkdir on the value returned from this |
- method if you wish to use it as a directory! |
- """ |
- MAX_FILENAME = 32 # some platforms limit lengths of filenames |
- base = os.path.join(self.__class__.__module__[:MAX_FILENAME], |
- self.__class__.__name__[:MAX_FILENAME], |
- self._testMethodName[:MAX_FILENAME]) |
- if not os.path.exists(base): |
- os.makedirs(base) |
- dirname = tempfile.mkdtemp('', '', base) |
- return os.path.join(dirname, 'temp') |
- |
- def _wait(self, d, running=_wait_is_running): |
- """Take a Deferred that only ever callbacks. Block until it happens. |
- """ |
- from twisted.internet import reactor |
- if running: |
- raise RuntimeError("_wait is not reentrant") |
- |
- results = [] |
- def append(any): |
- if results is not None: |
- results.append(any) |
- def crash(ign): |
- if results is not None: |
- reactor.crash() |
- crash = utils.suppressWarnings( |
- crash, util.suppress(message=r'reactor\.crash cannot be used.*', |
- category=DeprecationWarning)) |
- def stop(): |
- reactor.crash() |
- stop = utils.suppressWarnings( |
- stop, util.suppress(message=r'reactor\.crash cannot be used.*', |
- category=DeprecationWarning)) |
- |
- running.append(None) |
- try: |
- d.addBoth(append) |
- if results: |
- # d might have already been fired, in which case append is |
- # called synchronously. Avoid any reactor stuff. |
- return |
- d.addBoth(crash) |
- reactor.stop = stop |
- try: |
- reactor.run() |
- finally: |
- del reactor.stop |
- |
- # If the reactor was crashed elsewhere due to a timeout, hopefully |
- # that crasher also reported an error. Just return. |
- # _timedOut is most likely to be set when d has fired but hasn't |
- # completed its callback chain (see self._run) |
- if results or self._timedOut: #defined in run() and _run() |
- return |
- |
- # If the timeout didn't happen, and we didn't get a result or |
- # a failure, then the user probably aborted the test, so let's |
- # just raise KeyboardInterrupt. |
- |
- # FIXME: imagine this: |
- # web/test/test_webclient.py: |
- # exc = self.assertRaises(error.Error, wait, method(url)) |
- # |
- # wait() will raise KeyboardInterrupt, and assertRaises will |
- # swallow it. Therefore, wait() raising KeyboardInterrupt is |
- # insufficient to stop trial. A suggested solution is to have |
- # this code set a "stop trial" flag, or otherwise notify trial |
- # that it should really try to stop as soon as possible. |
- raise KeyboardInterrupt() |
- finally: |
- results = None |
- running.pop() |
- |
- |
-class UnsupportedTrialFeature(Exception): |
- """A feature of twisted.trial was used that pyunit cannot support.""" |
- |
- |
-class PyUnitResultAdapter(object): |
- """ |
- Wrap a C{TestResult} from the standard library's C{unittest} so that it |
- supports the extended result types from Trial, and also supports |
- L{twisted.python.failure.Failure}s being passed to L{addError} and |
- L{addFailure}. |
- """ |
- |
- def __init__(self, original): |
- """ |
- @param original: A C{TestResult} instance from C{unittest}. |
- """ |
- self.original = original |
- |
- def _exc_info(self, err): |
- if isinstance(err, failure.Failure): |
- # Unwrap the Failure into a exc_info tuple. |
- err = (err.type, err.value, err.getTracebackObject()) |
- return err |
- |
- def startTest(self, method): |
- self.original.startTest(method) |
- |
- def stopTest(self, method): |
- self.original.stopTest(method) |
- |
- def addFailure(self, test, fail): |
- self.original.addFailure(test, self._exc_info(fail)) |
- |
- def addError(self, test, error): |
- self.original.addError(test, self._exc_info(error)) |
- |
- def _unsupported(self, test, feature, info): |
- self.original.addFailure( |
- test, |
- (UnsupportedTrialFeature, |
- UnsupportedTrialFeature(feature, info), |
- None)) |
- |
- def addSkip(self, test, reason): |
- """ |
- Report the skip as a failure. |
- """ |
- self._unsupported(test, 'skip', reason) |
- |
- def addUnexpectedSuccess(self, test, todo): |
- """ |
- Report the unexpected success as a failure. |
- """ |
- self._unsupported(test, 'unexpected success', todo) |
- |
- def addExpectedFailure(self, test, error): |
- """ |
- Report the expected failure (i.e. todo) as a failure. |
- """ |
- self._unsupported(test, 'expected failure', error) |
- |
- def addSuccess(self, test): |
- self.original.addSuccess(test) |
- |
- def upDownError(self, method, error, warn, printStatus): |
- pass |
- |
- |
- |
-def suiteVisit(suite, visitor): |
- """ |
- Visit each test in C{suite} with C{visitor}. |
- |
- Deprecated in Twisted 8.0. |
- |
- @param visitor: A callable which takes a single argument, the L{TestCase} |
- instance to visit. |
- @return: None |
- """ |
- warnings.warn("Test visitors deprecated in Twisted 8.0", |
- category=DeprecationWarning) |
- for case in suite._tests: |
- visit = getattr(case, 'visit', None) |
- if visit is not None: |
- visit(visitor) |
- elif isinstance(case, pyunit.TestCase): |
- case = itrial.ITestCase(case) |
- case.visit(visitor) |
- elif isinstance(case, pyunit.TestSuite): |
- suiteVisit(case, visitor) |
- else: |
- case.visit(visitor) |
- |
- |
- |
-class TestSuite(pyunit.TestSuite): |
- """ |
- Extend the standard library's C{TestSuite} with support for the visitor |
- pattern and a consistently overrideable C{run} method. |
- """ |
- |
- visit = suiteVisit |
- |
- def __call__(self, result): |
- return self.run(result) |
- |
- |
- def run(self, result): |
- """ |
- Call C{run} on every member of the suite. |
- """ |
- # we implement this because Python 2.3 unittest defines this code |
- # in __call__, whereas 2.4 defines the code in run. |
- for test in self._tests: |
- if result.shouldStop: |
- break |
- test(result) |
- return result |
- |
- |
- |
-class TestDecorator(components.proxyForInterface(itrial.ITestCase, |
- "_originalTest")): |
- """ |
- Decorator for test cases. |
- |
- @param _originalTest: The wrapped instance of test. |
- @type _originalTest: A provider of L{itrial.ITestCase} |
- """ |
- |
- implements(itrial.ITestCase) |
- |
- |
- def __call__(self, result): |
- """ |
- Run the unit test. |
- |
- @param result: A TestResult object. |
- """ |
- return self.run(result) |
- |
- |
- def run(self, result): |
- """ |
- Run the unit test. |
- |
- @param result: A TestResult object. |
- """ |
- return self._originalTest.run( |
- reporter._AdaptedReporter(result, self.__class__)) |
- |
- |
- |
-def _clearSuite(suite): |
- """ |
- Clear all tests from C{suite}. |
- |
- This messes with the internals of C{suite}. In particular, it assumes that |
- the suite keeps all of its tests in a list in an instance variable called |
- C{_tests}. |
- """ |
- suite._tests = [] |
- |
- |
-def decorate(test, decorator): |
- """ |
- Decorate all test cases in C{test} with C{decorator}. |
- |
- C{test} can be a test case or a test suite. If it is a test suite, then the |
- structure of the suite is preserved. |
- |
- L{decorate} tries to preserve the class of the test suites it finds, but |
- assumes the presence of the C{_tests} attribute on the suite. |
- |
- @param test: The C{TestCase} or C{TestSuite} to decorate. |
- |
- @param decorator: A unary callable used to decorate C{TestCase}s. |
- |
- @return: A decorated C{TestCase} or a C{TestSuite} containing decorated |
- C{TestCase}s. |
- """ |
- |
- try: |
- tests = iter(test) |
- except TypeError: |
- return decorator(test) |
- |
- # At this point, we know that 'test' is a test suite. |
- _clearSuite(test) |
- |
- for case in tests: |
- test.addTest(decorate(case, decorator)) |
- return test |
- |
- |
- |
-class _PyUnitTestCaseAdapter(TestDecorator): |
- """ |
- Adapt from pyunit.TestCase to ITestCase. |
- """ |
- |
- |
- def visit(self, visitor): |
- """ |
- Deprecated in Twisted 8.0. |
- """ |
- warnings.warn("Test visitors deprecated in Twisted 8.0", |
- category=DeprecationWarning) |
- visitor(self) |
- |
- |
- |
-class _BrokenIDTestCaseAdapter(_PyUnitTestCaseAdapter): |
- """ |
- Adapter for pyunit-style C{TestCase} subclasses that have undesirable id() |
- methods. That is L{pyunit.FunctionTestCase} and L{pyunit.DocTestCase}. |
- """ |
- |
- def id(self): |
- """ |
- Return the fully-qualified Python name of the doctest. |
- """ |
- testID = self._originalTest.shortDescription() |
- if testID is not None: |
- return testID |
- return self._originalTest.id() |
- |
- |
- |
-class _ForceGarbageCollectionDecorator(TestDecorator): |
- """ |
- Forces garbage collection to be run before and after the test. Any errors |
- logged during the post-test collection are added to the test result as |
- errors. |
- """ |
- |
- def run(self, result): |
- gc.collect() |
- TestDecorator.run(self, result) |
- _logObserver._add() |
- gc.collect() |
- for error in _logObserver.getErrors(): |
- result.addError(self, error) |
- _logObserver.flushErrors() |
- _logObserver._remove() |
- |
- |
-components.registerAdapter( |
- _PyUnitTestCaseAdapter, pyunit.TestCase, itrial.ITestCase) |
- |
- |
-components.registerAdapter( |
- _BrokenIDTestCaseAdapter, pyunit.FunctionTestCase, itrial.ITestCase) |
- |
- |
-_docTestCase = getattr(doctest, 'DocTestCase', None) |
-if _docTestCase: |
- components.registerAdapter( |
- _BrokenIDTestCaseAdapter, _docTestCase, itrial.ITestCase) |
- |
- |
-def _iterateTests(testSuiteOrCase): |
- """ |
- Iterate through all of the test cases in C{testSuiteOrCase}. |
- """ |
- try: |
- suite = iter(testSuiteOrCase) |
- except TypeError: |
- yield testSuiteOrCase |
- else: |
- for test in suite: |
- for subtest in _iterateTests(test): |
- yield subtest |
- |
- |
- |
-# Support for Python 2.3 |
-try: |
- iter(pyunit.TestSuite()) |
-except TypeError: |
- # Python 2.3's TestSuite doesn't support iteration. Let's monkey patch it! |
- def __iter__(self): |
- return iter(self._tests) |
- pyunit.TestSuite.__iter__ = __iter__ |
- |
- |
- |
-class _SubTestCase(TestCase): |
- def __init__(self): |
- TestCase.__init__(self, 'run') |
- |
-_inst = _SubTestCase() |
- |
-def _deprecate(name): |
- """ |
- Internal method used to deprecate top-level assertions. Do not use this. |
- """ |
- def _(*args, **kwargs): |
- warnings.warn("unittest.%s is deprecated. Instead use the %r " |
- "method on unittest.TestCase" % (name, name), |
- stacklevel=2, category=DeprecationWarning) |
- return getattr(_inst, name)(*args, **kwargs) |
- return _ |
- |
- |
-_assertions = ['fail', 'failUnlessEqual', 'failIfEqual', 'failIfEquals', |
- 'failUnless', 'failUnlessIdentical', 'failUnlessIn', |
- 'failIfIdentical', 'failIfIn', 'failIf', |
- 'failUnlessAlmostEqual', 'failIfAlmostEqual', |
- 'failUnlessRaises', 'assertApproximates', |
- 'assertFailure', 'failUnlessSubstring', 'failIfSubstring', |
- 'assertAlmostEqual', 'assertAlmostEquals', |
- 'assertNotAlmostEqual', 'assertNotAlmostEquals', 'assertEqual', |
- 'assertEquals', 'assertNotEqual', 'assertNotEquals', |
- 'assertRaises', 'assert_', 'assertIdentical', |
- 'assertNotIdentical', 'assertIn', 'assertNotIn', |
- 'failUnlessFailure', 'assertSubstring', 'assertNotSubstring'] |
- |
- |
-for methodName in _assertions: |
- globals()[methodName] = _deprecate(methodName) |
- |
- |
-__all__ = ['TestCase', 'wait', 'FailTest', 'SkipTest'] |
- |