Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(31)

Unified Diff: third_party/twisted_8_1/twisted/trial/unittest.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « third_party/twisted_8_1/twisted/trial/test/weird.py ('k') | third_party/twisted_8_1/twisted/trial/util.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: third_party/twisted_8_1/twisted/trial/unittest.py
diff --git a/third_party/twisted_8_1/twisted/trial/unittest.py b/third_party/twisted_8_1/twisted/trial/unittest.py
deleted file mode 100644
index a95218c23ac73cc510932c0b397267393ae41f87..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/trial/unittest.py
+++ /dev/null
@@ -1,1454 +0,0 @@
-# -*- test-case-name: twisted.trial.test.test_tests -*-
-#
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Things likely to be used by writers of unit tests.
-
-Maintainer: Jonathan Lange <jml@twistedmatrix.com>
-"""
-
-
-import doctest
-import os, warnings, sys, tempfile, gc
-from pprint import pformat
-
-from twisted.internet import defer, utils
-from twisted.python import components, failure, log, monkey
-from twisted.python.compat import set
-from twisted.python import deprecate
-from twisted.python.deprecate import getDeprecationWarningString
-
-from twisted.trial import itrial, reporter, util
-
-pyunit = __import__('unittest')
-
-from zope.interface import implements
-
-
-
-class SkipTest(Exception):
- """
- Raise this (with a reason) to skip the current test. You may also set
- method.skip to a reason string to skip it, or set class.skip to skip the
- entire TestCase.
- """
-
-
-class FailTest(AssertionError):
- """Raised to indicate the current test has failed to pass."""
-
-
-class Todo(object):
- """
- Internal object used to mark a L{TestCase} as 'todo'. Tests marked 'todo'
- are reported differently in Trial L{TestResult}s. If todo'd tests fail,
- they do not fail the suite and the errors are reported in a separate
- category. If todo'd tests succeed, Trial L{TestResult}s will report an
- unexpected success.
- """
-
- def __init__(self, reason, errors=None):
- """
- @param reason: A string explaining why the test is marked 'todo'
-
- @param errors: An iterable of exception types that the test is
- expected to raise. If one of these errors is raised by the test, it
- will be trapped. Raising any other kind of error will fail the test.
- If C{None} is passed, then all errors will be trapped.
- """
- self.reason = reason
- self.errors = errors
-
- def __repr__(self):
- return "<Todo reason=%r errors=%r>" % (self.reason, self.errors)
-
- def expected(self, failure):
- """
- @param failure: A L{twisted.python.failure.Failure}.
-
- @return: C{True} if C{failure} is expected, C{False} otherwise.
- """
- if self.errors is None:
- return True
- for error in self.errors:
- if failure.check(error):
- return True
- return False
-
-
-def makeTodo(value):
- """
- Return a L{Todo} object built from C{value}.
-
- If C{value} is a string, return a Todo that expects any exception with
- C{value} as a reason. If C{value} is a tuple, the second element is used
- as the reason and the first element as the excepted error(s).
-
- @param value: A string or a tuple of C{(errors, reason)}, where C{errors}
- is either a single exception class or an iterable of exception classes.
-
- @return: A L{Todo} object.
- """
- if isinstance(value, str):
- return Todo(reason=value)
- if isinstance(value, tuple):
- errors, reason = value
- try:
- errors = list(errors)
- except TypeError:
- errors = [errors]
- return Todo(reason=reason, errors=errors)
-
-
-class _Assertions(pyunit.TestCase, object):
- """
- Replaces many of the built-in TestCase assertions. In general, these
- assertions provide better error messages and are easier to use in
- callbacks. Also provides new assertions such as L{failUnlessFailure}.
-
- Although the tests are defined as 'failIf*' and 'failUnless*', they can
- also be called as 'assertNot*' and 'assert*'.
- """
-
- def fail(self, msg=None):
- """absolutely fails the test, do not pass go, do not collect $200
-
- @param msg: the message that will be displayed as the reason for the
- failure
- """
- raise self.failureException(msg)
-
- def failIf(self, condition, msg=None):
- """fails the test if C{condition} evaluates to False
-
- @param condition: any object that defines __nonzero__
- """
- if condition:
- raise self.failureException(msg)
- return condition
- assertNot = assertFalse = failUnlessFalse = failIf
-
- def failUnless(self, condition, msg=None):
- """fails the test if C{condition} evaluates to True
-
- @param condition: any object that defines __nonzero__
- """
- if not condition:
- raise self.failureException(msg)
- return condition
- assert_ = assertTrue = failUnlessTrue = failUnless
-
- def failUnlessRaises(self, exception, f, *args, **kwargs):
- """fails the test unless calling the function C{f} with the given C{args}
- and C{kwargs} raises C{exception}. The failure will report the
- traceback and call stack of the unexpected exception.
-
- @param exception: exception type that is to be expected
- @param f: the function to call
-
- @return: The raised exception instance, if it is of the given type.
- @raise self.failureException: Raised if the function call does not raise an exception
- or if it raises an exception of a different type.
- """
- try:
- result = f(*args, **kwargs)
- except exception, inst:
- return inst
- except:
- raise self.failureException('%s raised instead of %s:\n %s'
- % (sys.exc_info()[0],
- exception.__name__,
- failure.Failure().getTraceback()))
- else:
- raise self.failureException('%s not raised (%r returned)'
- % (exception.__name__, result))
- assertRaises = failUnlessRaises
-
- def failUnlessEqual(self, first, second, msg=''):
- """
- Fail the test if C{first} and C{second} are not equal.
-
- @param msg: A string describing the failure that's included in the
- exception.
- """
- if not first == second:
- if msg is None:
- msg = ''
- if len(msg) > 0:
- msg += '\n'
- raise self.failureException(
- '%snot equal:\na = %s\nb = %s\n'
- % (msg, pformat(first), pformat(second)))
- return first
- assertEqual = assertEquals = failUnlessEquals = failUnlessEqual
-
- def failUnlessIdentical(self, first, second, msg=None):
- """fail the test if C{first} is not C{second}. This is an
- obect-identity-equality test, not an object equality (i.e. C{__eq__}) test
-
- @param msg: if msg is None, then the failure message will be
- '%r is not %r' % (first, second)
- """
- if first is not second:
- raise self.failureException(msg or '%r is not %r' % (first, second))
- return first
- assertIdentical = failUnlessIdentical
-
- def failIfIdentical(self, first, second, msg=None):
- """fail the test if C{first} is C{second}. This is an
- obect-identity-equality test, not an object equality (i.e. C{__eq__}) test
-
- @param msg: if msg is None, then the failure message will be
- '%r is %r' % (first, second)
- """
- if first is second:
- raise self.failureException(msg or '%r is %r' % (first, second))
- return first
- assertNotIdentical = failIfIdentical
-
- def failIfEqual(self, first, second, msg=None):
- """fail the test if C{first} == C{second}
-
- @param msg: if msg is None, then the failure message will be
- '%r == %r' % (first, second)
- """
- if not first != second:
- raise self.failureException(msg or '%r == %r' % (first, second))
- return first
- assertNotEqual = assertNotEquals = failIfEquals = failIfEqual
-
- def failUnlessIn(self, containee, container, msg=None):
- """fail the test if C{containee} is not found in C{container}
-
- @param containee: the value that should be in C{container}
- @param container: a sequence type, or in the case of a mapping type,
- will follow semantics of 'if key in dict.keys()'
- @param msg: if msg is None, then the failure message will be
- '%r not in %r' % (first, second)
- """
- if containee not in container:
- raise self.failureException(msg or "%r not in %r"
- % (containee, container))
- return containee
- assertIn = failUnlessIn
-
- def failIfIn(self, containee, container, msg=None):
- """fail the test if C{containee} is found in C{container}
-
- @param containee: the value that should not be in C{container}
- @param container: a sequence type, or in the case of a mapping type,
- will follow semantics of 'if key in dict.keys()'
- @param msg: if msg is None, then the failure message will be
- '%r in %r' % (first, second)
- """
- if containee in container:
- raise self.failureException(msg or "%r in %r"
- % (containee, container))
- return containee
- assertNotIn = failIfIn
-
- def failIfAlmostEqual(self, first, second, places=7, msg=None):
- """Fail if the two objects are equal as determined by their
- difference rounded to the given number of decimal places
- (default 7) and comparing to zero.
-
- @note: decimal places (from zero) is usually not the same
- as significant digits (measured from the most
- signficant digit).
-
- @note: included for compatiblity with PyUnit test cases
- """
- if round(second-first, places) == 0:
- raise self.failureException(msg or '%r == %r within %r places'
- % (first, second, places))
- return first
- assertNotAlmostEqual = assertNotAlmostEquals = failIfAlmostEqual
- failIfAlmostEquals = failIfAlmostEqual
-
- def failUnlessAlmostEqual(self, first, second, places=7, msg=None):
- """Fail if the two objects are unequal as determined by their
- difference rounded to the given number of decimal places
- (default 7) and comparing to zero.
-
- @note: decimal places (from zero) is usually not the same
- as significant digits (measured from the most
- signficant digit).
-
- @note: included for compatiblity with PyUnit test cases
- """
- if round(second-first, places) != 0:
- raise self.failureException(msg or '%r != %r within %r places'
- % (first, second, places))
- return first
- assertAlmostEqual = assertAlmostEquals = failUnlessAlmostEqual
- failUnlessAlmostEquals = failUnlessAlmostEqual
-
- def failUnlessApproximates(self, first, second, tolerance, msg=None):
- """asserts that C{first} - C{second} > C{tolerance}
-
- @param msg: if msg is None, then the failure message will be
- '%r ~== %r' % (first, second)
- """
- if abs(first - second) > tolerance:
- raise self.failureException(msg or "%s ~== %s" % (first, second))
- return first
- assertApproximates = failUnlessApproximates
-
- def failUnlessFailure(self, deferred, *expectedFailures):
- """Assert that C{deferred} will errback with one of
- C{expectedFailures}. Returns the original Deferred with callbacks
- added. You will need to return this Deferred from your test case.
- """
- def _cb(ignore):
- raise self.failureException(
- "did not catch an error, instead got %r" % (ignore,))
-
- def _eb(failure):
- if failure.check(*expectedFailures):
- return failure.value
- else:
- output = ('\nExpected: %r\nGot:\n%s'
- % (expectedFailures, str(failure)))
- raise self.failureException(output)
- return deferred.addCallbacks(_cb, _eb)
- assertFailure = failUnlessFailure
-
- def failUnlessSubstring(self, substring, astring, msg=None):
- return self.failUnlessIn(substring, astring, msg)
- assertSubstring = failUnlessSubstring
-
- def failIfSubstring(self, substring, astring, msg=None):
- return self.failIfIn(substring, astring, msg)
- assertNotSubstring = failIfSubstring
-
- def failUnlessWarns(self, category, message, filename, f,
- *args, **kwargs):
- """
- Fail if the given function doesn't generate the specified warning when
- called. It calls the function, checks the warning, and forwards the
- result of the function if everything is fine.
-
- @param category: the category of the warning to check.
- @param message: the output message of the warning to check.
- @param filename: the filename where the warning should come from.
- @param f: the function which is supposed to generate the warning.
- @type f: any callable.
- @param args: the arguments to C{f}.
- @param kwargs: the keywords arguments to C{f}.
-
- @return: the result of the original function C{f}.
- """
- warningsShown = []
- def warnExplicit(*args):
- warningsShown.append(args)
-
- origExplicit = warnings.warn_explicit
- try:
- warnings.warn_explicit = warnExplicit
- result = f(*args, **kwargs)
- finally:
- warnings.warn_explicit = origExplicit
-
- if not warningsShown:
- self.fail("No warnings emitted")
- first = warningsShown[0]
- for other in warningsShown[1:]:
- if other[:2] != first[:2]:
- self.fail("Can't handle different warnings")
- gotMessage, gotCategory, gotFilename, lineno = first[:4]
- self.assertEqual(gotMessage, message)
- self.assertIdentical(gotCategory, category)
-
- # Use starts with because of .pyc/.pyo issues.
- self.failUnless(
- filename.startswith(gotFilename),
- 'Warning in %r, expected %r' % (gotFilename, filename))
-
- # It would be nice to be able to check the line number as well, but
- # different configurations actually end up reporting different line
- # numbers (generally the variation is only 1 line, but that's enough
- # to fail the test erroneously...).
- # self.assertEqual(lineno, xxx)
-
- return result
- assertWarns = failUnlessWarns
-
- def failUnlessIsInstance(self, instance, classOrTuple):
- """
- Assert that the given instance is of the given class or of one of the
- given classes.
-
- @param instance: the object to test the type (first argument of the
- C{isinstance} call).
- @type instance: any.
- @param classOrTuple: the class or classes to test against (second
- argument of the C{isinstance} call).
- @type classOrTuple: class, type, or tuple.
- """
- if not isinstance(instance, classOrTuple):
- self.fail("%r is not an instance of %s" % (instance, classOrTuple))
-
- assertIsInstance = failUnlessIsInstance
-
- def failIfIsInstance(self, instance, classOrTuple):
- """
- Assert that the given instance is not of the given class or of one of
- the given classes.
-
- @param instance: the object to test the type (first argument of the
- C{isinstance} call).
- @type instance: any.
- @param classOrTuple: the class or classes to test against (second
- argument of the C{isinstance} call).
- @type classOrTuple: class, type, or tuple.
- """
- if isinstance(instance, classOrTuple):
- self.fail("%r is not an instance of %s" % (instance, classOrTuple))
-
- assertNotIsInstance = failIfIsInstance
-
-
-class _LogObserver(object):
- """
- Observes the Twisted logs and catches any errors.
- """
-
- def __init__(self):
- self._errors = []
- self._added = 0
- self._ignored = []
-
- def _add(self):
- if self._added == 0:
- log.addObserver(self.gotEvent)
- self._oldFE, log._flushErrors = (log._flushErrors, self.flushErrors)
- self._oldIE, log._ignore = (log._ignore, self._ignoreErrors)
- self._oldCI, log._clearIgnores = (log._clearIgnores,
- self._clearIgnores)
- self._added += 1
-
- def _remove(self):
- self._added -= 1
- if self._added == 0:
- log.removeObserver(self.gotEvent)
- log._flushErrors = self._oldFE
- log._ignore = self._oldIE
- log._clearIgnores = self._oldCI
-
- def _ignoreErrors(self, *errorTypes):
- """
- Do not store any errors with any of the given types.
- """
- self._ignored.extend(errorTypes)
-
- def _clearIgnores(self):
- """
- Stop ignoring any errors we might currently be ignoring.
- """
- self._ignored = []
-
- def flushErrors(self, *errorTypes):
- """
- Flush errors from the list of caught errors. If no arguments are
- specified, remove all errors. If arguments are specified, only remove
- errors of those types from the stored list.
- """
- if errorTypes:
- flushed = []
- remainder = []
- for f in self._errors:
- if f.check(*errorTypes):
- flushed.append(f)
- else:
- remainder.append(f)
- self._errors = remainder
- else:
- flushed = self._errors
- self._errors = []
- return flushed
-
- def getErrors(self):
- """
- Return a list of errors caught by this observer.
- """
- return self._errors
-
- def gotEvent(self, event):
- """
- The actual observer method. Called whenever a message is logged.
-
- @param event: A dictionary containing the log message. Actual
- structure undocumented (see source for L{twisted.python.log}).
- """
- if event.get('isError', False) and 'failure' in event:
- f = event['failure']
- if len(self._ignored) == 0 or not f.check(*self._ignored):
- self._errors.append(f)
-
-
-_logObserver = _LogObserver()
-
-_wait_is_running = []
-
-
-class TestCase(_Assertions):
- """
- A unit test. The atom of the unit testing universe.
-
- This class extends C{unittest.TestCase} from the standard library. The
- main feature is the ability to return C{Deferred}s from tests and fixture
- methods and to have the suite wait for those C{Deferred}s to fire.
-
- To write a unit test, subclass C{TestCase} and define a method (say,
- 'test_foo') on the subclass. To run the test, instantiate your subclass
- with the name of the method, and call L{run} on the instance, passing a
- L{TestResult} object.
-
- The C{trial} script will automatically find any C{TestCase} subclasses
- defined in modules beginning with 'test_' and construct test cases for all
- methods beginning with 'test'.
-
- If an error is logged during the test run, the test will fail with an
- error. See L{log.err}.
-
- @ivar failureException: An exception class, defaulting to C{FailTest}. If
- the test method raises this exception, it will be reported as a failure,
- rather than an exception. All of the assertion methods raise this if the
- assertion fails.
-
- @ivar skip: C{None} or a string explaining why this test is to be
- skipped. If defined, the test will not be run. Instead, it will be
- reported to the result object as 'skipped' (if the C{TestResult} supports
- skipping).
-
- @ivar suppress: C{None} or a list of tuples of C{(args, kwargs)} to be
- passed to C{warnings.filterwarnings}. Use these to suppress warnings
- raised in a test. Useful for testing deprecated code. See also
- L{util.suppress}.
-
- @ivar timeout: C{None} or a real number of seconds. If set, the test will
- raise an error if it takes longer than C{timeout} seconds.
-
- @ivar todo: C{None}, a string or a tuple of C{(errors, reason)} where
- C{errors} is either an exception class or an iterable of exception
- classes, and C{reason} is a string. See L{Todo} or L{makeTodo} for more
- information.
- """
-
- implements(itrial.ITestCase)
- failureException = FailTest
-
- def __init__(self, methodName='runTest'):
- """
- Construct an asynchronous test case for C{methodName}.
-
- @param methodName: The name of a method on C{self}. This method should
- be a unit test. That is, it should be a short method that calls some of
- the assert* methods. If C{methodName} is unspecified, L{runTest} will
- be used as the test method. This is mostly useful for testing Trial.
- """
- super(TestCase, self).__init__(methodName)
- self._testMethodName = methodName
- testMethod = getattr(self, methodName)
- self._parents = [testMethod, self]
- self._parents.extend(util.getPythonContainers(testMethod))
- self._shared = (hasattr(self, 'setUpClass') or
- hasattr(self, 'tearDownClass'))
- if self._shared:
- self._prepareClassFixture()
- if not hasattr(self.__class__, '_instances'):
- self._initInstances()
- self.__class__._instances.add(self)
- self._passed = False
- self._cleanups = []
-
- def _initInstances(cls):
- cls._instances = set()
- cls._instancesRun = set()
- _initInstances = classmethod(_initInstances)
-
- def _isFirst(self):
- return len(self.__class__._instancesRun) == 0
-
- def _isLast(self):
- return self.__class__._instancesRun == self.__class__._instances
-
- def _prepareClassFixture(self):
- """Lots of tests assume that test methods all run in the same instance
- of TestCase. This isn't true. Calling this method ensures that
- self.__class__._testCaseInstance contains an instance of this class
- that will remain the same for all tests from this class.
- """
- if not hasattr(self.__class__, '_testCaseInstance'):
- self.__class__._testCaseInstance = self
- if self.__class__._testCaseInstance.__class__ != self.__class__:
- self.__class__._testCaseInstance = self
-
- def _run(self, methodName, result):
- from twisted.internet import reactor
- timeout = self.getTimeout()
- def onTimeout(d):
- e = defer.TimeoutError("%r (%s) still running at %s secs"
- % (self, methodName, timeout))
- f = failure.Failure(e)
- # try to errback the deferred that the test returns (for no gorram
- # reason) (see issue1005 and test_errorPropagation in
- # test_deferred)
- try:
- d.errback(f)
- except defer.AlreadyCalledError:
- # if the deferred has been called already but the *back chain
- # is still unfinished, crash the reactor and report timeout
- # error ourself.
- reactor.crash()
- self._timedOut = True # see self._wait
- todo = self.getTodo()
- if todo is not None and todo.expected(f):
- result.addExpectedFailure(self, f, todo)
- else:
- result.addError(self, f)
- onTimeout = utils.suppressWarnings(
- onTimeout, util.suppress(category=DeprecationWarning))
- if self._shared:
- test = self.__class__._testCaseInstance
- else:
- test = self
- method = getattr(test, methodName)
- d = defer.maybeDeferred(utils.runWithWarningsSuppressed,
- self.getSuppress(), method)
- call = reactor.callLater(timeout, onTimeout, d)
- d.addBoth(lambda x : call.active() and call.cancel() or x)
- return d
-
- def shortDescription(self):
- desc = super(TestCase, self).shortDescription()
- if desc is None:
- return self._testMethodName
- return desc
-
- def __call__(self, *args, **kwargs):
- return self.run(*args, **kwargs)
-
- def deferSetUpClass(self, result):
- if not hasattr(self, 'setUpClass'):
- d = defer.succeed(None)
- d.addCallback(self.deferSetUp, result)
- return d
- d = self._run('setUpClass', result)
- d.addCallbacks(self.deferSetUp, self._ebDeferSetUpClass,
- callbackArgs=(result,),
- errbackArgs=(result,))
- return d
-
- def _ebDeferSetUpClass(self, error, result):
- if error.check(SkipTest):
- result.addSkip(self, self._getReason(error))
- self.__class__._instancesRun.remove(self)
- elif error.check(KeyboardInterrupt):
- result.stop()
- else:
- result.addError(self, error)
- self.__class__._instancesRun.remove(self)
-
- def deferSetUp(self, ignored, result):
- d = self._run('setUp', result)
- d.addCallbacks(self.deferTestMethod, self._ebDeferSetUp,
- callbackArgs=(result,),
- errbackArgs=(result,))
- return d
-
- def _ebDeferSetUp(self, failure, result):
- if failure.check(SkipTest):
- result.addSkip(self, self._getReason(failure))
- else:
- result.addError(self, failure)
- if failure.check(KeyboardInterrupt):
- result.stop()
- return self.deferRunCleanups(None, result)
-
- def deferTestMethod(self, ignored, result):
- d = self._run(self._testMethodName, result)
- d.addCallbacks(self._cbDeferTestMethod, self._ebDeferTestMethod,
- callbackArgs=(result,),
- errbackArgs=(result,))
- d.addBoth(self.deferRunCleanups, result)
- d.addBoth(self.deferTearDown, result)
- if self._shared and hasattr(self, 'tearDownClass') and self._isLast():
- d.addBoth(self.deferTearDownClass, result)
- return d
-
- def _cbDeferTestMethod(self, ignored, result):
- if self.getTodo() is not None:
- result.addUnexpectedSuccess(self, self.getTodo())
- else:
- self._passed = True
- return ignored
-
- def _ebDeferTestMethod(self, f, result):
- todo = self.getTodo()
- if todo is not None and todo.expected(f):
- result.addExpectedFailure(self, f, todo)
- elif f.check(self.failureException, FailTest):
- result.addFailure(self, f)
- elif f.check(KeyboardInterrupt):
- result.addError(self, f)
- result.stop()
- elif f.check(SkipTest):
- result.addSkip(self, self._getReason(f))
- else:
- result.addError(self, f)
-
- def deferTearDown(self, ignored, result):
- d = self._run('tearDown', result)
- d.addErrback(self._ebDeferTearDown, result)
- return d
-
- def _ebDeferTearDown(self, failure, result):
- result.addError(self, failure)
- if failure.check(KeyboardInterrupt):
- result.stop()
- self._passed = False
-
- def deferRunCleanups(self, ignored, result):
- """
- Run any scheduled cleanups and report errors (if any to the result
- object.
- """
- d = self._runCleanups()
- d.addCallback(self._cbDeferRunCleanups, result)
- return d
-
- def _cbDeferRunCleanups(self, cleanupResults, result):
- for flag, failure in cleanupResults:
- if flag == defer.FAILURE:
- result.addError(self, failure)
- if failure.check(KeyboardInterrupt):
- result.stop()
- self._passed = False
-
- def deferTearDownClass(self, ignored, result):
- d = self._run('tearDownClass', result)
- d.addErrback(self._ebTearDownClass, result)
- return d
-
- def _ebTearDownClass(self, error, result):
- if error.check(KeyboardInterrupt):
- result.stop()
- result.addError(self, error)
-
- def _cleanUp(self, result):
- try:
- clean = util._Janitor(self, result).postCaseCleanup()
- if not clean:
- self._passed = False
- except:
- result.addError(self, failure.Failure())
- self._passed = False
- for error in self._observer.getErrors():
- result.addError(self, error)
- self._passed = False
- self.flushLoggedErrors()
- self._removeObserver()
- if self._passed:
- result.addSuccess(self)
-
- def _classCleanUp(self, result):
- try:
- util._Janitor(self, result).postClassCleanup()
- except:
- result.addError(self, failure.Failure())
-
- def _makeReactorMethod(self, name):
- """
- Create a method which wraps the reactor method C{name}. The new
- method issues a deprecation warning and calls the original.
- """
- def _(*a, **kw):
- warnings.warn("reactor.%s cannot be used inside unit tests. "
- "In the future, using %s will fail the test and may "
- "crash or hang the test run."
- % (name, name),
- stacklevel=2, category=DeprecationWarning)
- return self._reactorMethods[name](*a, **kw)
- return _
-
- def _deprecateReactor(self, reactor):
- """
- Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is,
- each method is wrapped in a function that issues a deprecation
- warning, then calls the original.
-
- @param reactor: The Twisted reactor.
- """
- self._reactorMethods = {}
- for name in ['crash', 'iterate', 'stop']:
- self._reactorMethods[name] = getattr(reactor, name)
- setattr(reactor, name, self._makeReactorMethod(name))
-
- def _undeprecateReactor(self, reactor):
- """
- Restore the deprecated reactor methods. Undoes what
- L{_deprecateReactor} did.
-
- @param reactor: The Twisted reactor.
- """
- for name, method in self._reactorMethods.iteritems():
- setattr(reactor, name, method)
- self._reactorMethods = {}
-
- def _installObserver(self):
- self._observer = _logObserver
- self._observer._add()
-
- def _removeObserver(self):
- self._observer._remove()
-
- def flushLoggedErrors(self, *errorTypes):
- """
- Remove stored errors received from the log.
-
- C{TestCase} stores each error logged during the run of the test and
- reports them as errors during the cleanup phase (after C{tearDown}).
-
- @param *errorTypes: If unspecifed, flush all errors. Otherwise, only
- flush errors that match the given types.
-
- @return: A list of failures that have been removed.
- """
- return self._observer.flushErrors(*errorTypes)
-
-
- def addCleanup(self, f, *args, **kwargs):
- """
- Add the given function to a list of functions to be called after the
- test has run, but before C{tearDown}.
-
- Functions will be run in reverse order of being added. This helps
- ensure that tear down complements set up.
-
- The function C{f} may return a Deferred. If so, C{TestCase} will wait
- until the Deferred has fired before proceeding to the next function.
- """
- self._cleanups.append((f, args, kwargs))
-
-
- def _captureDeprecationWarnings(self, f, *args, **kwargs):
- """
- Call C{f} and capture all deprecation warnings.
- """
- warnings = []
- def accumulateDeprecations(message, category, stacklevel):
- self.assertEqual(DeprecationWarning, category)
- self.assertEqual(stacklevel, 2)
- warnings.append(message)
-
- originalMethod = deprecate.getWarningMethod()
- deprecate.setWarningMethod(accumulateDeprecations)
- try:
- result = f(*args, **kwargs)
- finally:
- deprecate.setWarningMethod(originalMethod)
- return (warnings, result)
-
-
- def callDeprecated(self, version, f, *args, **kwargs):
- """
- Call a function that was deprecated at a specific version.
-
- @param version: The version that the function was deprecated in.
- @param f: The deprecated function to call.
- @return: Whatever the function returns.
- """
- warnings, result = self._captureDeprecationWarnings(
- f, *args, **kwargs)
-
- if len(warnings) == 0:
- self.fail('%r is not deprecated.' % (f,))
-
- observedWarning = warnings[0]
- expectedWarning = getDeprecationWarningString(f, version)
- self.assertEqual(expectedWarning, observedWarning)
-
- return result
-
-
- def _runCleanups(self):
- """
- Run the cleanups added with L{addCleanup} in order.
-
- @return: A C{Deferred} that fires when all cleanups are run.
- """
- def _makeFunction(f, args, kwargs):
- return lambda: f(*args, **kwargs)
- callables = []
- while len(self._cleanups) > 0:
- f, args, kwargs = self._cleanups.pop()
- callables.append(_makeFunction(f, args, kwargs))
- return util._runSequentially(callables)
-
-
- def patch(self, obj, attribute, value):
- """
- Monkey patch an object for the duration of the test.
-
- The monkey patch will be reverted at the end of the test using the
- L{addCleanup} mechanism.
-
- The L{MonkeyPatcher} is returned so that users can restore and
- re-apply the monkey patch within their tests.
-
- @param obj: The object to monkey patch.
- @param attribute: The name of the attribute to change.
- @param value: The value to set the attribute to.
- @return: A L{monkey.MonkeyPatcher} object.
- """
- monkeyPatch = monkey.MonkeyPatcher((obj, attribute, value))
- monkeyPatch.patch()
- self.addCleanup(monkeyPatch.restore)
- return monkeyPatch
-
-
- def runTest(self):
- """
- If no C{methodName} argument is passed to the constructor, L{run} will
- treat this method as the thing with the actual test inside.
- """
-
-
- def run(self, result):
- """
- Run the test case, storing the results in C{result}.
-
- First runs C{setUp} on self, then runs the test method (defined in the
- constructor), then runs C{tearDown}. Any of these may return
- L{Deferred}s. After they complete, does some reactor cleanup.
-
- @param result: A L{TestResult} object.
- """
- log.msg("--> %s <--" % (self.id()))
- from twisted.internet import reactor
- new_result = itrial.IReporter(result, None)
- if new_result is None:
- result = PyUnitResultAdapter(result)
- else:
- result = new_result
- self._timedOut = False
- if self._shared and self not in self.__class__._instances:
- self.__class__._instances.add(self)
- result.startTest(self)
- if self.getSkip(): # don't run test methods that are marked as .skip
- result.addSkip(self, self.getSkip())
- result.stopTest(self)
- return
- self._installObserver()
- self._passed = False
- first = False
- if self._shared:
- first = self._isFirst()
- self.__class__._instancesRun.add(self)
- self._deprecateReactor(reactor)
- try:
- if first:
- d = self.deferSetUpClass(result)
- else:
- d = self.deferSetUp(None, result)
- try:
- self._wait(d)
- finally:
- self._cleanUp(result)
- result.stopTest(self)
- if self._shared and self._isLast():
- self._initInstances()
- self._classCleanUp(result)
- if not self._shared:
- self._classCleanUp(result)
- finally:
- self._undeprecateReactor(reactor)
-
- def _getReason(self, f):
- if len(f.value.args) > 0:
- reason = f.value.args[0]
- else:
- warnings.warn(("Do not raise unittest.SkipTest with no "
- "arguments! Give a reason for skipping tests!"),
- stacklevel=2)
- reason = f
- return reason
-
- def getSkip(self):
- """
- Return the skip reason set on this test, if any is set. Checks on the
- instance first, then the class, then the module, then packages. As
- soon as it finds something with a C{skip} attribute, returns that.
- Returns C{None} if it cannot find anything. See L{TestCase} docstring
- for more details.
- """
- return util.acquireAttribute(self._parents, 'skip', None)
-
- def getTodo(self):
- """
- Return a L{Todo} object if the test is marked todo. Checks on the
- instance first, then the class, then the module, then packages. As
- soon as it finds something with a C{todo} attribute, returns that.
- Returns C{None} if it cannot find anything. See L{TestCase} docstring
- for more details.
- """
- todo = util.acquireAttribute(self._parents, 'todo', None)
- if todo is None:
- return None
- return makeTodo(todo)
-
- def getTimeout(self):
- """
- Returns the timeout value set on this test. Checks on the instance
- first, then the class, then the module, then packages. As soon as it
- finds something with a C{timeout} attribute, returns that. Returns
- L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See
- L{TestCase} docstring for more details.
- """
- timeout = util.acquireAttribute(self._parents, 'timeout',
- util.DEFAULT_TIMEOUT_DURATION)
- try:
- return float(timeout)
- except (ValueError, TypeError):
- # XXX -- this is here because sometimes people will have methods
- # called 'timeout', or set timeout to 'orange', or something
- # Particularly, test_news.NewsTestCase and ReactorCoreTestCase
- # both do this.
- warnings.warn("'timeout' attribute needs to be a number.",
- category=DeprecationWarning)
- return util.DEFAULT_TIMEOUT_DURATION
-
- def getSuppress(self):
- """
- Returns any warning suppressions set for this test. Checks on the
- instance first, then the class, then the module, then packages. As
- soon as it finds something with a C{suppress} attribute, returns that.
- Returns any empty list (i.e. suppress no warnings) if it cannot find
- anything. See L{TestCase} docstring for more details.
- """
- return util.acquireAttribute(self._parents, 'suppress', [])
-
-
- def visit(self, visitor):
- """
- Visit this test case. Call C{visitor} with C{self} as a parameter.
-
- Deprecated in Twisted 8.0.
-
- @param visitor: A callable which expects a single parameter: a test
- case.
-
- @return: None
- """
- warnings.warn("Test visitors deprecated in Twisted 8.0",
- category=DeprecationWarning)
- visitor(self)
-
-
- def mktemp(self):
- """Returns a unique name that may be used as either a temporary
- directory or filename.
-
- @note: you must call os.mkdir on the value returned from this
- method if you wish to use it as a directory!
- """
- MAX_FILENAME = 32 # some platforms limit lengths of filenames
- base = os.path.join(self.__class__.__module__[:MAX_FILENAME],
- self.__class__.__name__[:MAX_FILENAME],
- self._testMethodName[:MAX_FILENAME])
- if not os.path.exists(base):
- os.makedirs(base)
- dirname = tempfile.mkdtemp('', '', base)
- return os.path.join(dirname, 'temp')
-
- def _wait(self, d, running=_wait_is_running):
- """Take a Deferred that only ever callbacks. Block until it happens.
- """
- from twisted.internet import reactor
- if running:
- raise RuntimeError("_wait is not reentrant")
-
- results = []
- def append(any):
- if results is not None:
- results.append(any)
- def crash(ign):
- if results is not None:
- reactor.crash()
- crash = utils.suppressWarnings(
- crash, util.suppress(message=r'reactor\.crash cannot be used.*',
- category=DeprecationWarning))
- def stop():
- reactor.crash()
- stop = utils.suppressWarnings(
- stop, util.suppress(message=r'reactor\.crash cannot be used.*',
- category=DeprecationWarning))
-
- running.append(None)
- try:
- d.addBoth(append)
- if results:
- # d might have already been fired, in which case append is
- # called synchronously. Avoid any reactor stuff.
- return
- d.addBoth(crash)
- reactor.stop = stop
- try:
- reactor.run()
- finally:
- del reactor.stop
-
- # If the reactor was crashed elsewhere due to a timeout, hopefully
- # that crasher also reported an error. Just return.
- # _timedOut is most likely to be set when d has fired but hasn't
- # completed its callback chain (see self._run)
- if results or self._timedOut: #defined in run() and _run()
- return
-
- # If the timeout didn't happen, and we didn't get a result or
- # a failure, then the user probably aborted the test, so let's
- # just raise KeyboardInterrupt.
-
- # FIXME: imagine this:
- # web/test/test_webclient.py:
- # exc = self.assertRaises(error.Error, wait, method(url))
- #
- # wait() will raise KeyboardInterrupt, and assertRaises will
- # swallow it. Therefore, wait() raising KeyboardInterrupt is
- # insufficient to stop trial. A suggested solution is to have
- # this code set a "stop trial" flag, or otherwise notify trial
- # that it should really try to stop as soon as possible.
- raise KeyboardInterrupt()
- finally:
- results = None
- running.pop()
-
-
-class UnsupportedTrialFeature(Exception):
- """A feature of twisted.trial was used that pyunit cannot support."""
-
-
-class PyUnitResultAdapter(object):
- """
- Wrap a C{TestResult} from the standard library's C{unittest} so that it
- supports the extended result types from Trial, and also supports
- L{twisted.python.failure.Failure}s being passed to L{addError} and
- L{addFailure}.
- """
-
- def __init__(self, original):
- """
- @param original: A C{TestResult} instance from C{unittest}.
- """
- self.original = original
-
- def _exc_info(self, err):
- if isinstance(err, failure.Failure):
- # Unwrap the Failure into a exc_info tuple.
- err = (err.type, err.value, err.getTracebackObject())
- return err
-
- def startTest(self, method):
- self.original.startTest(method)
-
- def stopTest(self, method):
- self.original.stopTest(method)
-
- def addFailure(self, test, fail):
- self.original.addFailure(test, self._exc_info(fail))
-
- def addError(self, test, error):
- self.original.addError(test, self._exc_info(error))
-
- def _unsupported(self, test, feature, info):
- self.original.addFailure(
- test,
- (UnsupportedTrialFeature,
- UnsupportedTrialFeature(feature, info),
- None))
-
- def addSkip(self, test, reason):
- """
- Report the skip as a failure.
- """
- self._unsupported(test, 'skip', reason)
-
- def addUnexpectedSuccess(self, test, todo):
- """
- Report the unexpected success as a failure.
- """
- self._unsupported(test, 'unexpected success', todo)
-
- def addExpectedFailure(self, test, error):
- """
- Report the expected failure (i.e. todo) as a failure.
- """
- self._unsupported(test, 'expected failure', error)
-
- def addSuccess(self, test):
- self.original.addSuccess(test)
-
- def upDownError(self, method, error, warn, printStatus):
- pass
-
-
-
-def suiteVisit(suite, visitor):
- """
- Visit each test in C{suite} with C{visitor}.
-
- Deprecated in Twisted 8.0.
-
- @param visitor: A callable which takes a single argument, the L{TestCase}
- instance to visit.
- @return: None
- """
- warnings.warn("Test visitors deprecated in Twisted 8.0",
- category=DeprecationWarning)
- for case in suite._tests:
- visit = getattr(case, 'visit', None)
- if visit is not None:
- visit(visitor)
- elif isinstance(case, pyunit.TestCase):
- case = itrial.ITestCase(case)
- case.visit(visitor)
- elif isinstance(case, pyunit.TestSuite):
- suiteVisit(case, visitor)
- else:
- case.visit(visitor)
-
-
-
-class TestSuite(pyunit.TestSuite):
- """
- Extend the standard library's C{TestSuite} with support for the visitor
- pattern and a consistently overrideable C{run} method.
- """
-
- visit = suiteVisit
-
- def __call__(self, result):
- return self.run(result)
-
-
- def run(self, result):
- """
- Call C{run} on every member of the suite.
- """
- # we implement this because Python 2.3 unittest defines this code
- # in __call__, whereas 2.4 defines the code in run.
- for test in self._tests:
- if result.shouldStop:
- break
- test(result)
- return result
-
-
-
-class TestDecorator(components.proxyForInterface(itrial.ITestCase,
- "_originalTest")):
- """
- Decorator for test cases.
-
- @param _originalTest: The wrapped instance of test.
- @type _originalTest: A provider of L{itrial.ITestCase}
- """
-
- implements(itrial.ITestCase)
-
-
- def __call__(self, result):
- """
- Run the unit test.
-
- @param result: A TestResult object.
- """
- return self.run(result)
-
-
- def run(self, result):
- """
- Run the unit test.
-
- @param result: A TestResult object.
- """
- return self._originalTest.run(
- reporter._AdaptedReporter(result, self.__class__))
-
-
-
-def _clearSuite(suite):
- """
- Clear all tests from C{suite}.
-
- This messes with the internals of C{suite}. In particular, it assumes that
- the suite keeps all of its tests in a list in an instance variable called
- C{_tests}.
- """
- suite._tests = []
-
-
-def decorate(test, decorator):
- """
- Decorate all test cases in C{test} with C{decorator}.
-
- C{test} can be a test case or a test suite. If it is a test suite, then the
- structure of the suite is preserved.
-
- L{decorate} tries to preserve the class of the test suites it finds, but
- assumes the presence of the C{_tests} attribute on the suite.
-
- @param test: The C{TestCase} or C{TestSuite} to decorate.
-
- @param decorator: A unary callable used to decorate C{TestCase}s.
-
- @return: A decorated C{TestCase} or a C{TestSuite} containing decorated
- C{TestCase}s.
- """
-
- try:
- tests = iter(test)
- except TypeError:
- return decorator(test)
-
- # At this point, we know that 'test' is a test suite.
- _clearSuite(test)
-
- for case in tests:
- test.addTest(decorate(case, decorator))
- return test
-
-
-
-class _PyUnitTestCaseAdapter(TestDecorator):
- """
- Adapt from pyunit.TestCase to ITestCase.
- """
-
-
- def visit(self, visitor):
- """
- Deprecated in Twisted 8.0.
- """
- warnings.warn("Test visitors deprecated in Twisted 8.0",
- category=DeprecationWarning)
- visitor(self)
-
-
-
-class _BrokenIDTestCaseAdapter(_PyUnitTestCaseAdapter):
- """
- Adapter for pyunit-style C{TestCase} subclasses that have undesirable id()
- methods. That is L{pyunit.FunctionTestCase} and L{pyunit.DocTestCase}.
- """
-
- def id(self):
- """
- Return the fully-qualified Python name of the doctest.
- """
- testID = self._originalTest.shortDescription()
- if testID is not None:
- return testID
- return self._originalTest.id()
-
-
-
-class _ForceGarbageCollectionDecorator(TestDecorator):
- """
- Forces garbage collection to be run before and after the test. Any errors
- logged during the post-test collection are added to the test result as
- errors.
- """
-
- def run(self, result):
- gc.collect()
- TestDecorator.run(self, result)
- _logObserver._add()
- gc.collect()
- for error in _logObserver.getErrors():
- result.addError(self, error)
- _logObserver.flushErrors()
- _logObserver._remove()
-
-
-components.registerAdapter(
- _PyUnitTestCaseAdapter, pyunit.TestCase, itrial.ITestCase)
-
-
-components.registerAdapter(
- _BrokenIDTestCaseAdapter, pyunit.FunctionTestCase, itrial.ITestCase)
-
-
-_docTestCase = getattr(doctest, 'DocTestCase', None)
-if _docTestCase:
- components.registerAdapter(
- _BrokenIDTestCaseAdapter, _docTestCase, itrial.ITestCase)
-
-
-def _iterateTests(testSuiteOrCase):
- """
- Iterate through all of the test cases in C{testSuiteOrCase}.
- """
- try:
- suite = iter(testSuiteOrCase)
- except TypeError:
- yield testSuiteOrCase
- else:
- for test in suite:
- for subtest in _iterateTests(test):
- yield subtest
-
-
-
-# Support for Python 2.3
-try:
- iter(pyunit.TestSuite())
-except TypeError:
- # Python 2.3's TestSuite doesn't support iteration. Let's monkey patch it!
- def __iter__(self):
- return iter(self._tests)
- pyunit.TestSuite.__iter__ = __iter__
-
-
-
-class _SubTestCase(TestCase):
- def __init__(self):
- TestCase.__init__(self, 'run')
-
-_inst = _SubTestCase()
-
-def _deprecate(name):
- """
- Internal method used to deprecate top-level assertions. Do not use this.
- """
- def _(*args, **kwargs):
- warnings.warn("unittest.%s is deprecated. Instead use the %r "
- "method on unittest.TestCase" % (name, name),
- stacklevel=2, category=DeprecationWarning)
- return getattr(_inst, name)(*args, **kwargs)
- return _
-
-
-_assertions = ['fail', 'failUnlessEqual', 'failIfEqual', 'failIfEquals',
- 'failUnless', 'failUnlessIdentical', 'failUnlessIn',
- 'failIfIdentical', 'failIfIn', 'failIf',
- 'failUnlessAlmostEqual', 'failIfAlmostEqual',
- 'failUnlessRaises', 'assertApproximates',
- 'assertFailure', 'failUnlessSubstring', 'failIfSubstring',
- 'assertAlmostEqual', 'assertAlmostEquals',
- 'assertNotAlmostEqual', 'assertNotAlmostEquals', 'assertEqual',
- 'assertEquals', 'assertNotEqual', 'assertNotEquals',
- 'assertRaises', 'assert_', 'assertIdentical',
- 'assertNotIdentical', 'assertIn', 'assertNotIn',
- 'failUnlessFailure', 'assertSubstring', 'assertNotSubstring']
-
-
-for methodName in _assertions:
- globals()[methodName] = _deprecate(methodName)
-
-
-__all__ = ['TestCase', 'wait', 'FailTest', 'SkipTest']
-
« no previous file with comments | « third_party/twisted_8_1/twisted/trial/test/weird.py ('k') | third_party/twisted_8_1/twisted/trial/util.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698