Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(103)

Side by Side Diff: third_party/twisted_8_1/twisted/trial/unittest.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.trial.test.test_tests -*-
2 #
3 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6 """
7 Things likely to be used by writers of unit tests.
8
9 Maintainer: Jonathan Lange <jml@twistedmatrix.com>
10 """
11
12
13 import doctest
14 import os, warnings, sys, tempfile, gc
15 from pprint import pformat
16
17 from twisted.internet import defer, utils
18 from twisted.python import components, failure, log, monkey
19 from twisted.python.compat import set
20 from twisted.python import deprecate
21 from twisted.python.deprecate import getDeprecationWarningString
22
23 from twisted.trial import itrial, reporter, util
24
25 pyunit = __import__('unittest')
26
27 from zope.interface import implements
28
29
30
31 class SkipTest(Exception):
32 """
33 Raise this (with a reason) to skip the current test. You may also set
34 method.skip to a reason string to skip it, or set class.skip to skip the
35 entire TestCase.
36 """
37
38
39 class FailTest(AssertionError):
40 """Raised to indicate the current test has failed to pass."""
41
42
43 class Todo(object):
44 """
45 Internal object used to mark a L{TestCase} as 'todo'. Tests marked 'todo'
46 are reported differently in Trial L{TestResult}s. If todo'd tests fail,
47 they do not fail the suite and the errors are reported in a separate
48 category. If todo'd tests succeed, Trial L{TestResult}s will report an
49 unexpected success.
50 """
51
52 def __init__(self, reason, errors=None):
53 """
54 @param reason: A string explaining why the test is marked 'todo'
55
56 @param errors: An iterable of exception types that the test is
57 expected to raise. If one of these errors is raised by the test, it
58 will be trapped. Raising any other kind of error will fail the test.
59 If C{None} is passed, then all errors will be trapped.
60 """
61 self.reason = reason
62 self.errors = errors
63
64 def __repr__(self):
65 return "<Todo reason=%r errors=%r>" % (self.reason, self.errors)
66
67 def expected(self, failure):
68 """
69 @param failure: A L{twisted.python.failure.Failure}.
70
71 @return: C{True} if C{failure} is expected, C{False} otherwise.
72 """
73 if self.errors is None:
74 return True
75 for error in self.errors:
76 if failure.check(error):
77 return True
78 return False
79
80
81 def makeTodo(value):
82 """
83 Return a L{Todo} object built from C{value}.
84
85 If C{value} is a string, return a Todo that expects any exception with
86 C{value} as a reason. If C{value} is a tuple, the second element is used
87 as the reason and the first element as the excepted error(s).
88
89 @param value: A string or a tuple of C{(errors, reason)}, where C{errors}
90 is either a single exception class or an iterable of exception classes.
91
92 @return: A L{Todo} object.
93 """
94 if isinstance(value, str):
95 return Todo(reason=value)
96 if isinstance(value, tuple):
97 errors, reason = value
98 try:
99 errors = list(errors)
100 except TypeError:
101 errors = [errors]
102 return Todo(reason=reason, errors=errors)
103
104
105 class _Assertions(pyunit.TestCase, object):
106 """
107 Replaces many of the built-in TestCase assertions. In general, these
108 assertions provide better error messages and are easier to use in
109 callbacks. Also provides new assertions such as L{failUnlessFailure}.
110
111 Although the tests are defined as 'failIf*' and 'failUnless*', they can
112 also be called as 'assertNot*' and 'assert*'.
113 """
114
115 def fail(self, msg=None):
116 """absolutely fails the test, do not pass go, do not collect $200
117
118 @param msg: the message that will be displayed as the reason for the
119 failure
120 """
121 raise self.failureException(msg)
122
123 def failIf(self, condition, msg=None):
124 """fails the test if C{condition} evaluates to False
125
126 @param condition: any object that defines __nonzero__
127 """
128 if condition:
129 raise self.failureException(msg)
130 return condition
131 assertNot = assertFalse = failUnlessFalse = failIf
132
133 def failUnless(self, condition, msg=None):
134 """fails the test if C{condition} evaluates to True
135
136 @param condition: any object that defines __nonzero__
137 """
138 if not condition:
139 raise self.failureException(msg)
140 return condition
141 assert_ = assertTrue = failUnlessTrue = failUnless
142
143 def failUnlessRaises(self, exception, f, *args, **kwargs):
144 """fails the test unless calling the function C{f} with the given C{args }
145 and C{kwargs} raises C{exception}. The failure will report the
146 traceback and call stack of the unexpected exception.
147
148 @param exception: exception type that is to be expected
149 @param f: the function to call
150
151 @return: The raised exception instance, if it is of the given type.
152 @raise self.failureException: Raised if the function call does not raise an exception
153 or if it raises an exception of a different type.
154 """
155 try:
156 result = f(*args, **kwargs)
157 except exception, inst:
158 return inst
159 except:
160 raise self.failureException('%s raised instead of %s:\n %s'
161 % (sys.exc_info()[0],
162 exception.__name__,
163 failure.Failure().getTraceback()))
164 else:
165 raise self.failureException('%s not raised (%r returned)'
166 % (exception.__name__, result))
167 assertRaises = failUnlessRaises
168
169 def failUnlessEqual(self, first, second, msg=''):
170 """
171 Fail the test if C{first} and C{second} are not equal.
172
173 @param msg: A string describing the failure that's included in the
174 exception.
175 """
176 if not first == second:
177 if msg is None:
178 msg = ''
179 if len(msg) > 0:
180 msg += '\n'
181 raise self.failureException(
182 '%snot equal:\na = %s\nb = %s\n'
183 % (msg, pformat(first), pformat(second)))
184 return first
185 assertEqual = assertEquals = failUnlessEquals = failUnlessEqual
186
187 def failUnlessIdentical(self, first, second, msg=None):
188 """fail the test if C{first} is not C{second}. This is an
189 obect-identity-equality test, not an object equality (i.e. C{__eq__}) te st
190
191 @param msg: if msg is None, then the failure message will be
192 '%r is not %r' % (first, second)
193 """
194 if first is not second:
195 raise self.failureException(msg or '%r is not %r' % (first, second))
196 return first
197 assertIdentical = failUnlessIdentical
198
199 def failIfIdentical(self, first, second, msg=None):
200 """fail the test if C{first} is C{second}. This is an
201 obect-identity-equality test, not an object equality (i.e. C{__eq__}) te st
202
203 @param msg: if msg is None, then the failure message will be
204 '%r is %r' % (first, second)
205 """
206 if first is second:
207 raise self.failureException(msg or '%r is %r' % (first, second))
208 return first
209 assertNotIdentical = failIfIdentical
210
211 def failIfEqual(self, first, second, msg=None):
212 """fail the test if C{first} == C{second}
213
214 @param msg: if msg is None, then the failure message will be
215 '%r == %r' % (first, second)
216 """
217 if not first != second:
218 raise self.failureException(msg or '%r == %r' % (first, second))
219 return first
220 assertNotEqual = assertNotEquals = failIfEquals = failIfEqual
221
222 def failUnlessIn(self, containee, container, msg=None):
223 """fail the test if C{containee} is not found in C{container}
224
225 @param containee: the value that should be in C{container}
226 @param container: a sequence type, or in the case of a mapping type,
227 will follow semantics of 'if key in dict.keys()'
228 @param msg: if msg is None, then the failure message will be
229 '%r not in %r' % (first, second)
230 """
231 if containee not in container:
232 raise self.failureException(msg or "%r not in %r"
233 % (containee, container))
234 return containee
235 assertIn = failUnlessIn
236
237 def failIfIn(self, containee, container, msg=None):
238 """fail the test if C{containee} is found in C{container}
239
240 @param containee: the value that should not be in C{container}
241 @param container: a sequence type, or in the case of a mapping type,
242 will follow semantics of 'if key in dict.keys()'
243 @param msg: if msg is None, then the failure message will be
244 '%r in %r' % (first, second)
245 """
246 if containee in container:
247 raise self.failureException(msg or "%r in %r"
248 % (containee, container))
249 return containee
250 assertNotIn = failIfIn
251
252 def failIfAlmostEqual(self, first, second, places=7, msg=None):
253 """Fail if the two objects are equal as determined by their
254 difference rounded to the given number of decimal places
255 (default 7) and comparing to zero.
256
257 @note: decimal places (from zero) is usually not the same
258 as significant digits (measured from the most
259 signficant digit).
260
261 @note: included for compatiblity with PyUnit test cases
262 """
263 if round(second-first, places) == 0:
264 raise self.failureException(msg or '%r == %r within %r places'
265 % (first, second, places))
266 return first
267 assertNotAlmostEqual = assertNotAlmostEquals = failIfAlmostEqual
268 failIfAlmostEquals = failIfAlmostEqual
269
270 def failUnlessAlmostEqual(self, first, second, places=7, msg=None):
271 """Fail if the two objects are unequal as determined by their
272 difference rounded to the given number of decimal places
273 (default 7) and comparing to zero.
274
275 @note: decimal places (from zero) is usually not the same
276 as significant digits (measured from the most
277 signficant digit).
278
279 @note: included for compatiblity with PyUnit test cases
280 """
281 if round(second-first, places) != 0:
282 raise self.failureException(msg or '%r != %r within %r places'
283 % (first, second, places))
284 return first
285 assertAlmostEqual = assertAlmostEquals = failUnlessAlmostEqual
286 failUnlessAlmostEquals = failUnlessAlmostEqual
287
288 def failUnlessApproximates(self, first, second, tolerance, msg=None):
289 """asserts that C{first} - C{second} > C{tolerance}
290
291 @param msg: if msg is None, then the failure message will be
292 '%r ~== %r' % (first, second)
293 """
294 if abs(first - second) > tolerance:
295 raise self.failureException(msg or "%s ~== %s" % (first, second))
296 return first
297 assertApproximates = failUnlessApproximates
298
299 def failUnlessFailure(self, deferred, *expectedFailures):
300 """Assert that C{deferred} will errback with one of
301 C{expectedFailures}. Returns the original Deferred with callbacks
302 added. You will need to return this Deferred from your test case.
303 """
304 def _cb(ignore):
305 raise self.failureException(
306 "did not catch an error, instead got %r" % (ignore,))
307
308 def _eb(failure):
309 if failure.check(*expectedFailures):
310 return failure.value
311 else:
312 output = ('\nExpected: %r\nGot:\n%s'
313 % (expectedFailures, str(failure)))
314 raise self.failureException(output)
315 return deferred.addCallbacks(_cb, _eb)
316 assertFailure = failUnlessFailure
317
318 def failUnlessSubstring(self, substring, astring, msg=None):
319 return self.failUnlessIn(substring, astring, msg)
320 assertSubstring = failUnlessSubstring
321
322 def failIfSubstring(self, substring, astring, msg=None):
323 return self.failIfIn(substring, astring, msg)
324 assertNotSubstring = failIfSubstring
325
326 def failUnlessWarns(self, category, message, filename, f,
327 *args, **kwargs):
328 """
329 Fail if the given function doesn't generate the specified warning when
330 called. It calls the function, checks the warning, and forwards the
331 result of the function if everything is fine.
332
333 @param category: the category of the warning to check.
334 @param message: the output message of the warning to check.
335 @param filename: the filename where the warning should come from.
336 @param f: the function which is supposed to generate the warning.
337 @type f: any callable.
338 @param args: the arguments to C{f}.
339 @param kwargs: the keywords arguments to C{f}.
340
341 @return: the result of the original function C{f}.
342 """
343 warningsShown = []
344 def warnExplicit(*args):
345 warningsShown.append(args)
346
347 origExplicit = warnings.warn_explicit
348 try:
349 warnings.warn_explicit = warnExplicit
350 result = f(*args, **kwargs)
351 finally:
352 warnings.warn_explicit = origExplicit
353
354 if not warningsShown:
355 self.fail("No warnings emitted")
356 first = warningsShown[0]
357 for other in warningsShown[1:]:
358 if other[:2] != first[:2]:
359 self.fail("Can't handle different warnings")
360 gotMessage, gotCategory, gotFilename, lineno = first[:4]
361 self.assertEqual(gotMessage, message)
362 self.assertIdentical(gotCategory, category)
363
364 # Use starts with because of .pyc/.pyo issues.
365 self.failUnless(
366 filename.startswith(gotFilename),
367 'Warning in %r, expected %r' % (gotFilename, filename))
368
369 # It would be nice to be able to check the line number as well, but
370 # different configurations actually end up reporting different line
371 # numbers (generally the variation is only 1 line, but that's enough
372 # to fail the test erroneously...).
373 # self.assertEqual(lineno, xxx)
374
375 return result
376 assertWarns = failUnlessWarns
377
378 def failUnlessIsInstance(self, instance, classOrTuple):
379 """
380 Assert that the given instance is of the given class or of one of the
381 given classes.
382
383 @param instance: the object to test the type (first argument of the
384 C{isinstance} call).
385 @type instance: any.
386 @param classOrTuple: the class or classes to test against (second
387 argument of the C{isinstance} call).
388 @type classOrTuple: class, type, or tuple.
389 """
390 if not isinstance(instance, classOrTuple):
391 self.fail("%r is not an instance of %s" % (instance, classOrTuple))
392
393 assertIsInstance = failUnlessIsInstance
394
395 def failIfIsInstance(self, instance, classOrTuple):
396 """
397 Assert that the given instance is not of the given class or of one of
398 the given classes.
399
400 @param instance: the object to test the type (first argument of the
401 C{isinstance} call).
402 @type instance: any.
403 @param classOrTuple: the class or classes to test against (second
404 argument of the C{isinstance} call).
405 @type classOrTuple: class, type, or tuple.
406 """
407 if isinstance(instance, classOrTuple):
408 self.fail("%r is not an instance of %s" % (instance, classOrTuple))
409
410 assertNotIsInstance = failIfIsInstance
411
412
413 class _LogObserver(object):
414 """
415 Observes the Twisted logs and catches any errors.
416 """
417
418 def __init__(self):
419 self._errors = []
420 self._added = 0
421 self._ignored = []
422
423 def _add(self):
424 if self._added == 0:
425 log.addObserver(self.gotEvent)
426 self._oldFE, log._flushErrors = (log._flushErrors, self.flushErrors)
427 self._oldIE, log._ignore = (log._ignore, self._ignoreErrors)
428 self._oldCI, log._clearIgnores = (log._clearIgnores,
429 self._clearIgnores)
430 self._added += 1
431
432 def _remove(self):
433 self._added -= 1
434 if self._added == 0:
435 log.removeObserver(self.gotEvent)
436 log._flushErrors = self._oldFE
437 log._ignore = self._oldIE
438 log._clearIgnores = self._oldCI
439
440 def _ignoreErrors(self, *errorTypes):
441 """
442 Do not store any errors with any of the given types.
443 """
444 self._ignored.extend(errorTypes)
445
446 def _clearIgnores(self):
447 """
448 Stop ignoring any errors we might currently be ignoring.
449 """
450 self._ignored = []
451
452 def flushErrors(self, *errorTypes):
453 """
454 Flush errors from the list of caught errors. If no arguments are
455 specified, remove all errors. If arguments are specified, only remove
456 errors of those types from the stored list.
457 """
458 if errorTypes:
459 flushed = []
460 remainder = []
461 for f in self._errors:
462 if f.check(*errorTypes):
463 flushed.append(f)
464 else:
465 remainder.append(f)
466 self._errors = remainder
467 else:
468 flushed = self._errors
469 self._errors = []
470 return flushed
471
472 def getErrors(self):
473 """
474 Return a list of errors caught by this observer.
475 """
476 return self._errors
477
478 def gotEvent(self, event):
479 """
480 The actual observer method. Called whenever a message is logged.
481
482 @param event: A dictionary containing the log message. Actual
483 structure undocumented (see source for L{twisted.python.log}).
484 """
485 if event.get('isError', False) and 'failure' in event:
486 f = event['failure']
487 if len(self._ignored) == 0 or not f.check(*self._ignored):
488 self._errors.append(f)
489
490
491 _logObserver = _LogObserver()
492
493 _wait_is_running = []
494
495
496 class TestCase(_Assertions):
497 """
498 A unit test. The atom of the unit testing universe.
499
500 This class extends C{unittest.TestCase} from the standard library. The
501 main feature is the ability to return C{Deferred}s from tests and fixture
502 methods and to have the suite wait for those C{Deferred}s to fire.
503
504 To write a unit test, subclass C{TestCase} and define a method (say,
505 'test_foo') on the subclass. To run the test, instantiate your subclass
506 with the name of the method, and call L{run} on the instance, passing a
507 L{TestResult} object.
508
509 The C{trial} script will automatically find any C{TestCase} subclasses
510 defined in modules beginning with 'test_' and construct test cases for all
511 methods beginning with 'test'.
512
513 If an error is logged during the test run, the test will fail with an
514 error. See L{log.err}.
515
516 @ivar failureException: An exception class, defaulting to C{FailTest}. If
517 the test method raises this exception, it will be reported as a failure,
518 rather than an exception. All of the assertion methods raise this if the
519 assertion fails.
520
521 @ivar skip: C{None} or a string explaining why this test is to be
522 skipped. If defined, the test will not be run. Instead, it will be
523 reported to the result object as 'skipped' (if the C{TestResult} supports
524 skipping).
525
526 @ivar suppress: C{None} or a list of tuples of C{(args, kwargs)} to be
527 passed to C{warnings.filterwarnings}. Use these to suppress warnings
528 raised in a test. Useful for testing deprecated code. See also
529 L{util.suppress}.
530
531 @ivar timeout: C{None} or a real number of seconds. If set, the test will
532 raise an error if it takes longer than C{timeout} seconds.
533
534 @ivar todo: C{None}, a string or a tuple of C{(errors, reason)} where
535 C{errors} is either an exception class or an iterable of exception
536 classes, and C{reason} is a string. See L{Todo} or L{makeTodo} for more
537 information.
538 """
539
540 implements(itrial.ITestCase)
541 failureException = FailTest
542
543 def __init__(self, methodName='runTest'):
544 """
545 Construct an asynchronous test case for C{methodName}.
546
547 @param methodName: The name of a method on C{self}. This method should
548 be a unit test. That is, it should be a short method that calls some of
549 the assert* methods. If C{methodName} is unspecified, L{runTest} will
550 be used as the test method. This is mostly useful for testing Trial.
551 """
552 super(TestCase, self).__init__(methodName)
553 self._testMethodName = methodName
554 testMethod = getattr(self, methodName)
555 self._parents = [testMethod, self]
556 self._parents.extend(util.getPythonContainers(testMethod))
557 self._shared = (hasattr(self, 'setUpClass') or
558 hasattr(self, 'tearDownClass'))
559 if self._shared:
560 self._prepareClassFixture()
561 if not hasattr(self.__class__, '_instances'):
562 self._initInstances()
563 self.__class__._instances.add(self)
564 self._passed = False
565 self._cleanups = []
566
567 def _initInstances(cls):
568 cls._instances = set()
569 cls._instancesRun = set()
570 _initInstances = classmethod(_initInstances)
571
572 def _isFirst(self):
573 return len(self.__class__._instancesRun) == 0
574
575 def _isLast(self):
576 return self.__class__._instancesRun == self.__class__._instances
577
578 def _prepareClassFixture(self):
579 """Lots of tests assume that test methods all run in the same instance
580 of TestCase. This isn't true. Calling this method ensures that
581 self.__class__._testCaseInstance contains an instance of this class
582 that will remain the same for all tests from this class.
583 """
584 if not hasattr(self.__class__, '_testCaseInstance'):
585 self.__class__._testCaseInstance = self
586 if self.__class__._testCaseInstance.__class__ != self.__class__:
587 self.__class__._testCaseInstance = self
588
589 def _run(self, methodName, result):
590 from twisted.internet import reactor
591 timeout = self.getTimeout()
592 def onTimeout(d):
593 e = defer.TimeoutError("%r (%s) still running at %s secs"
594 % (self, methodName, timeout))
595 f = failure.Failure(e)
596 # try to errback the deferred that the test returns (for no gorram
597 # reason) (see issue1005 and test_errorPropagation in
598 # test_deferred)
599 try:
600 d.errback(f)
601 except defer.AlreadyCalledError:
602 # if the deferred has been called already but the *back chain
603 # is still unfinished, crash the reactor and report timeout
604 # error ourself.
605 reactor.crash()
606 self._timedOut = True # see self._wait
607 todo = self.getTodo()
608 if todo is not None and todo.expected(f):
609 result.addExpectedFailure(self, f, todo)
610 else:
611 result.addError(self, f)
612 onTimeout = utils.suppressWarnings(
613 onTimeout, util.suppress(category=DeprecationWarning))
614 if self._shared:
615 test = self.__class__._testCaseInstance
616 else:
617 test = self
618 method = getattr(test, methodName)
619 d = defer.maybeDeferred(utils.runWithWarningsSuppressed,
620 self.getSuppress(), method)
621 call = reactor.callLater(timeout, onTimeout, d)
622 d.addBoth(lambda x : call.active() and call.cancel() or x)
623 return d
624
625 def shortDescription(self):
626 desc = super(TestCase, self).shortDescription()
627 if desc is None:
628 return self._testMethodName
629 return desc
630
631 def __call__(self, *args, **kwargs):
632 return self.run(*args, **kwargs)
633
634 def deferSetUpClass(self, result):
635 if not hasattr(self, 'setUpClass'):
636 d = defer.succeed(None)
637 d.addCallback(self.deferSetUp, result)
638 return d
639 d = self._run('setUpClass', result)
640 d.addCallbacks(self.deferSetUp, self._ebDeferSetUpClass,
641 callbackArgs=(result,),
642 errbackArgs=(result,))
643 return d
644
645 def _ebDeferSetUpClass(self, error, result):
646 if error.check(SkipTest):
647 result.addSkip(self, self._getReason(error))
648 self.__class__._instancesRun.remove(self)
649 elif error.check(KeyboardInterrupt):
650 result.stop()
651 else:
652 result.addError(self, error)
653 self.__class__._instancesRun.remove(self)
654
655 def deferSetUp(self, ignored, result):
656 d = self._run('setUp', result)
657 d.addCallbacks(self.deferTestMethod, self._ebDeferSetUp,
658 callbackArgs=(result,),
659 errbackArgs=(result,))
660 return d
661
662 def _ebDeferSetUp(self, failure, result):
663 if failure.check(SkipTest):
664 result.addSkip(self, self._getReason(failure))
665 else:
666 result.addError(self, failure)
667 if failure.check(KeyboardInterrupt):
668 result.stop()
669 return self.deferRunCleanups(None, result)
670
671 def deferTestMethod(self, ignored, result):
672 d = self._run(self._testMethodName, result)
673 d.addCallbacks(self._cbDeferTestMethod, self._ebDeferTestMethod,
674 callbackArgs=(result,),
675 errbackArgs=(result,))
676 d.addBoth(self.deferRunCleanups, result)
677 d.addBoth(self.deferTearDown, result)
678 if self._shared and hasattr(self, 'tearDownClass') and self._isLast():
679 d.addBoth(self.deferTearDownClass, result)
680 return d
681
682 def _cbDeferTestMethod(self, ignored, result):
683 if self.getTodo() is not None:
684 result.addUnexpectedSuccess(self, self.getTodo())
685 else:
686 self._passed = True
687 return ignored
688
689 def _ebDeferTestMethod(self, f, result):
690 todo = self.getTodo()
691 if todo is not None and todo.expected(f):
692 result.addExpectedFailure(self, f, todo)
693 elif f.check(self.failureException, FailTest):
694 result.addFailure(self, f)
695 elif f.check(KeyboardInterrupt):
696 result.addError(self, f)
697 result.stop()
698 elif f.check(SkipTest):
699 result.addSkip(self, self._getReason(f))
700 else:
701 result.addError(self, f)
702
703 def deferTearDown(self, ignored, result):
704 d = self._run('tearDown', result)
705 d.addErrback(self._ebDeferTearDown, result)
706 return d
707
708 def _ebDeferTearDown(self, failure, result):
709 result.addError(self, failure)
710 if failure.check(KeyboardInterrupt):
711 result.stop()
712 self._passed = False
713
714 def deferRunCleanups(self, ignored, result):
715 """
716 Run any scheduled cleanups and report errors (if any to the result
717 object.
718 """
719 d = self._runCleanups()
720 d.addCallback(self._cbDeferRunCleanups, result)
721 return d
722
723 def _cbDeferRunCleanups(self, cleanupResults, result):
724 for flag, failure in cleanupResults:
725 if flag == defer.FAILURE:
726 result.addError(self, failure)
727 if failure.check(KeyboardInterrupt):
728 result.stop()
729 self._passed = False
730
731 def deferTearDownClass(self, ignored, result):
732 d = self._run('tearDownClass', result)
733 d.addErrback(self._ebTearDownClass, result)
734 return d
735
736 def _ebTearDownClass(self, error, result):
737 if error.check(KeyboardInterrupt):
738 result.stop()
739 result.addError(self, error)
740
741 def _cleanUp(self, result):
742 try:
743 clean = util._Janitor(self, result).postCaseCleanup()
744 if not clean:
745 self._passed = False
746 except:
747 result.addError(self, failure.Failure())
748 self._passed = False
749 for error in self._observer.getErrors():
750 result.addError(self, error)
751 self._passed = False
752 self.flushLoggedErrors()
753 self._removeObserver()
754 if self._passed:
755 result.addSuccess(self)
756
757 def _classCleanUp(self, result):
758 try:
759 util._Janitor(self, result).postClassCleanup()
760 except:
761 result.addError(self, failure.Failure())
762
763 def _makeReactorMethod(self, name):
764 """
765 Create a method which wraps the reactor method C{name}. The new
766 method issues a deprecation warning and calls the original.
767 """
768 def _(*a, **kw):
769 warnings.warn("reactor.%s cannot be used inside unit tests. "
770 "In the future, using %s will fail the test and may "
771 "crash or hang the test run."
772 % (name, name),
773 stacklevel=2, category=DeprecationWarning)
774 return self._reactorMethods[name](*a, **kw)
775 return _
776
777 def _deprecateReactor(self, reactor):
778 """
779 Deprecate C{iterate}, C{crash} and C{stop} on C{reactor}. That is,
780 each method is wrapped in a function that issues a deprecation
781 warning, then calls the original.
782
783 @param reactor: The Twisted reactor.
784 """
785 self._reactorMethods = {}
786 for name in ['crash', 'iterate', 'stop']:
787 self._reactorMethods[name] = getattr(reactor, name)
788 setattr(reactor, name, self._makeReactorMethod(name))
789
790 def _undeprecateReactor(self, reactor):
791 """
792 Restore the deprecated reactor methods. Undoes what
793 L{_deprecateReactor} did.
794
795 @param reactor: The Twisted reactor.
796 """
797 for name, method in self._reactorMethods.iteritems():
798 setattr(reactor, name, method)
799 self._reactorMethods = {}
800
801 def _installObserver(self):
802 self._observer = _logObserver
803 self._observer._add()
804
805 def _removeObserver(self):
806 self._observer._remove()
807
808 def flushLoggedErrors(self, *errorTypes):
809 """
810 Remove stored errors received from the log.
811
812 C{TestCase} stores each error logged during the run of the test and
813 reports them as errors during the cleanup phase (after C{tearDown}).
814
815 @param *errorTypes: If unspecifed, flush all errors. Otherwise, only
816 flush errors that match the given types.
817
818 @return: A list of failures that have been removed.
819 """
820 return self._observer.flushErrors(*errorTypes)
821
822
823 def addCleanup(self, f, *args, **kwargs):
824 """
825 Add the given function to a list of functions to be called after the
826 test has run, but before C{tearDown}.
827
828 Functions will be run in reverse order of being added. This helps
829 ensure that tear down complements set up.
830
831 The function C{f} may return a Deferred. If so, C{TestCase} will wait
832 until the Deferred has fired before proceeding to the next function.
833 """
834 self._cleanups.append((f, args, kwargs))
835
836
837 def _captureDeprecationWarnings(self, f, *args, **kwargs):
838 """
839 Call C{f} and capture all deprecation warnings.
840 """
841 warnings = []
842 def accumulateDeprecations(message, category, stacklevel):
843 self.assertEqual(DeprecationWarning, category)
844 self.assertEqual(stacklevel, 2)
845 warnings.append(message)
846
847 originalMethod = deprecate.getWarningMethod()
848 deprecate.setWarningMethod(accumulateDeprecations)
849 try:
850 result = f(*args, **kwargs)
851 finally:
852 deprecate.setWarningMethod(originalMethod)
853 return (warnings, result)
854
855
856 def callDeprecated(self, version, f, *args, **kwargs):
857 """
858 Call a function that was deprecated at a specific version.
859
860 @param version: The version that the function was deprecated in.
861 @param f: The deprecated function to call.
862 @return: Whatever the function returns.
863 """
864 warnings, result = self._captureDeprecationWarnings(
865 f, *args, **kwargs)
866
867 if len(warnings) == 0:
868 self.fail('%r is not deprecated.' % (f,))
869
870 observedWarning = warnings[0]
871 expectedWarning = getDeprecationWarningString(f, version)
872 self.assertEqual(expectedWarning, observedWarning)
873
874 return result
875
876
877 def _runCleanups(self):
878 """
879 Run the cleanups added with L{addCleanup} in order.
880
881 @return: A C{Deferred} that fires when all cleanups are run.
882 """
883 def _makeFunction(f, args, kwargs):
884 return lambda: f(*args, **kwargs)
885 callables = []
886 while len(self._cleanups) > 0:
887 f, args, kwargs = self._cleanups.pop()
888 callables.append(_makeFunction(f, args, kwargs))
889 return util._runSequentially(callables)
890
891
892 def patch(self, obj, attribute, value):
893 """
894 Monkey patch an object for the duration of the test.
895
896 The monkey patch will be reverted at the end of the test using the
897 L{addCleanup} mechanism.
898
899 The L{MonkeyPatcher} is returned so that users can restore and
900 re-apply the monkey patch within their tests.
901
902 @param obj: The object to monkey patch.
903 @param attribute: The name of the attribute to change.
904 @param value: The value to set the attribute to.
905 @return: A L{monkey.MonkeyPatcher} object.
906 """
907 monkeyPatch = monkey.MonkeyPatcher((obj, attribute, value))
908 monkeyPatch.patch()
909 self.addCleanup(monkeyPatch.restore)
910 return monkeyPatch
911
912
913 def runTest(self):
914 """
915 If no C{methodName} argument is passed to the constructor, L{run} will
916 treat this method as the thing with the actual test inside.
917 """
918
919
920 def run(self, result):
921 """
922 Run the test case, storing the results in C{result}.
923
924 First runs C{setUp} on self, then runs the test method (defined in the
925 constructor), then runs C{tearDown}. Any of these may return
926 L{Deferred}s. After they complete, does some reactor cleanup.
927
928 @param result: A L{TestResult} object.
929 """
930 log.msg("--> %s <--" % (self.id()))
931 from twisted.internet import reactor
932 new_result = itrial.IReporter(result, None)
933 if new_result is None:
934 result = PyUnitResultAdapter(result)
935 else:
936 result = new_result
937 self._timedOut = False
938 if self._shared and self not in self.__class__._instances:
939 self.__class__._instances.add(self)
940 result.startTest(self)
941 if self.getSkip(): # don't run test methods that are marked as .skip
942 result.addSkip(self, self.getSkip())
943 result.stopTest(self)
944 return
945 self._installObserver()
946 self._passed = False
947 first = False
948 if self._shared:
949 first = self._isFirst()
950 self.__class__._instancesRun.add(self)
951 self._deprecateReactor(reactor)
952 try:
953 if first:
954 d = self.deferSetUpClass(result)
955 else:
956 d = self.deferSetUp(None, result)
957 try:
958 self._wait(d)
959 finally:
960 self._cleanUp(result)
961 result.stopTest(self)
962 if self._shared and self._isLast():
963 self._initInstances()
964 self._classCleanUp(result)
965 if not self._shared:
966 self._classCleanUp(result)
967 finally:
968 self._undeprecateReactor(reactor)
969
970 def _getReason(self, f):
971 if len(f.value.args) > 0:
972 reason = f.value.args[0]
973 else:
974 warnings.warn(("Do not raise unittest.SkipTest with no "
975 "arguments! Give a reason for skipping tests!"),
976 stacklevel=2)
977 reason = f
978 return reason
979
980 def getSkip(self):
981 """
982 Return the skip reason set on this test, if any is set. Checks on the
983 instance first, then the class, then the module, then packages. As
984 soon as it finds something with a C{skip} attribute, returns that.
985 Returns C{None} if it cannot find anything. See L{TestCase} docstring
986 for more details.
987 """
988 return util.acquireAttribute(self._parents, 'skip', None)
989
990 def getTodo(self):
991 """
992 Return a L{Todo} object if the test is marked todo. Checks on the
993 instance first, then the class, then the module, then packages. As
994 soon as it finds something with a C{todo} attribute, returns that.
995 Returns C{None} if it cannot find anything. See L{TestCase} docstring
996 for more details.
997 """
998 todo = util.acquireAttribute(self._parents, 'todo', None)
999 if todo is None:
1000 return None
1001 return makeTodo(todo)
1002
1003 def getTimeout(self):
1004 """
1005 Returns the timeout value set on this test. Checks on the instance
1006 first, then the class, then the module, then packages. As soon as it
1007 finds something with a C{timeout} attribute, returns that. Returns
1008 L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See
1009 L{TestCase} docstring for more details.
1010 """
1011 timeout = util.acquireAttribute(self._parents, 'timeout',
1012 util.DEFAULT_TIMEOUT_DURATION)
1013 try:
1014 return float(timeout)
1015 except (ValueError, TypeError):
1016 # XXX -- this is here because sometimes people will have methods
1017 # called 'timeout', or set timeout to 'orange', or something
1018 # Particularly, test_news.NewsTestCase and ReactorCoreTestCase
1019 # both do this.
1020 warnings.warn("'timeout' attribute needs to be a number.",
1021 category=DeprecationWarning)
1022 return util.DEFAULT_TIMEOUT_DURATION
1023
1024 def getSuppress(self):
1025 """
1026 Returns any warning suppressions set for this test. Checks on the
1027 instance first, then the class, then the module, then packages. As
1028 soon as it finds something with a C{suppress} attribute, returns that.
1029 Returns any empty list (i.e. suppress no warnings) if it cannot find
1030 anything. See L{TestCase} docstring for more details.
1031 """
1032 return util.acquireAttribute(self._parents, 'suppress', [])
1033
1034
1035 def visit(self, visitor):
1036 """
1037 Visit this test case. Call C{visitor} with C{self} as a parameter.
1038
1039 Deprecated in Twisted 8.0.
1040
1041 @param visitor: A callable which expects a single parameter: a test
1042 case.
1043
1044 @return: None
1045 """
1046 warnings.warn("Test visitors deprecated in Twisted 8.0",
1047 category=DeprecationWarning)
1048 visitor(self)
1049
1050
1051 def mktemp(self):
1052 """Returns a unique name that may be used as either a temporary
1053 directory or filename.
1054
1055 @note: you must call os.mkdir on the value returned from this
1056 method if you wish to use it as a directory!
1057 """
1058 MAX_FILENAME = 32 # some platforms limit lengths of filenames
1059 base = os.path.join(self.__class__.__module__[:MAX_FILENAME],
1060 self.__class__.__name__[:MAX_FILENAME],
1061 self._testMethodName[:MAX_FILENAME])
1062 if not os.path.exists(base):
1063 os.makedirs(base)
1064 dirname = tempfile.mkdtemp('', '', base)
1065 return os.path.join(dirname, 'temp')
1066
1067 def _wait(self, d, running=_wait_is_running):
1068 """Take a Deferred that only ever callbacks. Block until it happens.
1069 """
1070 from twisted.internet import reactor
1071 if running:
1072 raise RuntimeError("_wait is not reentrant")
1073
1074 results = []
1075 def append(any):
1076 if results is not None:
1077 results.append(any)
1078 def crash(ign):
1079 if results is not None:
1080 reactor.crash()
1081 crash = utils.suppressWarnings(
1082 crash, util.suppress(message=r'reactor\.crash cannot be used.*',
1083 category=DeprecationWarning))
1084 def stop():
1085 reactor.crash()
1086 stop = utils.suppressWarnings(
1087 stop, util.suppress(message=r'reactor\.crash cannot be used.*',
1088 category=DeprecationWarning))
1089
1090 running.append(None)
1091 try:
1092 d.addBoth(append)
1093 if results:
1094 # d might have already been fired, in which case append is
1095 # called synchronously. Avoid any reactor stuff.
1096 return
1097 d.addBoth(crash)
1098 reactor.stop = stop
1099 try:
1100 reactor.run()
1101 finally:
1102 del reactor.stop
1103
1104 # If the reactor was crashed elsewhere due to a timeout, hopefully
1105 # that crasher also reported an error. Just return.
1106 # _timedOut is most likely to be set when d has fired but hasn't
1107 # completed its callback chain (see self._run)
1108 if results or self._timedOut: #defined in run() and _run()
1109 return
1110
1111 # If the timeout didn't happen, and we didn't get a result or
1112 # a failure, then the user probably aborted the test, so let's
1113 # just raise KeyboardInterrupt.
1114
1115 # FIXME: imagine this:
1116 # web/test/test_webclient.py:
1117 # exc = self.assertRaises(error.Error, wait, method(url))
1118 #
1119 # wait() will raise KeyboardInterrupt, and assertRaises will
1120 # swallow it. Therefore, wait() raising KeyboardInterrupt is
1121 # insufficient to stop trial. A suggested solution is to have
1122 # this code set a "stop trial" flag, or otherwise notify trial
1123 # that it should really try to stop as soon as possible.
1124 raise KeyboardInterrupt()
1125 finally:
1126 results = None
1127 running.pop()
1128
1129
1130 class UnsupportedTrialFeature(Exception):
1131 """A feature of twisted.trial was used that pyunit cannot support."""
1132
1133
1134 class PyUnitResultAdapter(object):
1135 """
1136 Wrap a C{TestResult} from the standard library's C{unittest} so that it
1137 supports the extended result types from Trial, and also supports
1138 L{twisted.python.failure.Failure}s being passed to L{addError} and
1139 L{addFailure}.
1140 """
1141
1142 def __init__(self, original):
1143 """
1144 @param original: A C{TestResult} instance from C{unittest}.
1145 """
1146 self.original = original
1147
1148 def _exc_info(self, err):
1149 if isinstance(err, failure.Failure):
1150 # Unwrap the Failure into a exc_info tuple.
1151 err = (err.type, err.value, err.getTracebackObject())
1152 return err
1153
1154 def startTest(self, method):
1155 self.original.startTest(method)
1156
1157 def stopTest(self, method):
1158 self.original.stopTest(method)
1159
1160 def addFailure(self, test, fail):
1161 self.original.addFailure(test, self._exc_info(fail))
1162
1163 def addError(self, test, error):
1164 self.original.addError(test, self._exc_info(error))
1165
1166 def _unsupported(self, test, feature, info):
1167 self.original.addFailure(
1168 test,
1169 (UnsupportedTrialFeature,
1170 UnsupportedTrialFeature(feature, info),
1171 None))
1172
1173 def addSkip(self, test, reason):
1174 """
1175 Report the skip as a failure.
1176 """
1177 self._unsupported(test, 'skip', reason)
1178
1179 def addUnexpectedSuccess(self, test, todo):
1180 """
1181 Report the unexpected success as a failure.
1182 """
1183 self._unsupported(test, 'unexpected success', todo)
1184
1185 def addExpectedFailure(self, test, error):
1186 """
1187 Report the expected failure (i.e. todo) as a failure.
1188 """
1189 self._unsupported(test, 'expected failure', error)
1190
1191 def addSuccess(self, test):
1192 self.original.addSuccess(test)
1193
1194 def upDownError(self, method, error, warn, printStatus):
1195 pass
1196
1197
1198
1199 def suiteVisit(suite, visitor):
1200 """
1201 Visit each test in C{suite} with C{visitor}.
1202
1203 Deprecated in Twisted 8.0.
1204
1205 @param visitor: A callable which takes a single argument, the L{TestCase}
1206 instance to visit.
1207 @return: None
1208 """
1209 warnings.warn("Test visitors deprecated in Twisted 8.0",
1210 category=DeprecationWarning)
1211 for case in suite._tests:
1212 visit = getattr(case, 'visit', None)
1213 if visit is not None:
1214 visit(visitor)
1215 elif isinstance(case, pyunit.TestCase):
1216 case = itrial.ITestCase(case)
1217 case.visit(visitor)
1218 elif isinstance(case, pyunit.TestSuite):
1219 suiteVisit(case, visitor)
1220 else:
1221 case.visit(visitor)
1222
1223
1224
1225 class TestSuite(pyunit.TestSuite):
1226 """
1227 Extend the standard library's C{TestSuite} with support for the visitor
1228 pattern and a consistently overrideable C{run} method.
1229 """
1230
1231 visit = suiteVisit
1232
1233 def __call__(self, result):
1234 return self.run(result)
1235
1236
1237 def run(self, result):
1238 """
1239 Call C{run} on every member of the suite.
1240 """
1241 # we implement this because Python 2.3 unittest defines this code
1242 # in __call__, whereas 2.4 defines the code in run.
1243 for test in self._tests:
1244 if result.shouldStop:
1245 break
1246 test(result)
1247 return result
1248
1249
1250
1251 class TestDecorator(components.proxyForInterface(itrial.ITestCase,
1252 "_originalTest")):
1253 """
1254 Decorator for test cases.
1255
1256 @param _originalTest: The wrapped instance of test.
1257 @type _originalTest: A provider of L{itrial.ITestCase}
1258 """
1259
1260 implements(itrial.ITestCase)
1261
1262
1263 def __call__(self, result):
1264 """
1265 Run the unit test.
1266
1267 @param result: A TestResult object.
1268 """
1269 return self.run(result)
1270
1271
1272 def run(self, result):
1273 """
1274 Run the unit test.
1275
1276 @param result: A TestResult object.
1277 """
1278 return self._originalTest.run(
1279 reporter._AdaptedReporter(result, self.__class__))
1280
1281
1282
1283 def _clearSuite(suite):
1284 """
1285 Clear all tests from C{suite}.
1286
1287 This messes with the internals of C{suite}. In particular, it assumes that
1288 the suite keeps all of its tests in a list in an instance variable called
1289 C{_tests}.
1290 """
1291 suite._tests = []
1292
1293
1294 def decorate(test, decorator):
1295 """
1296 Decorate all test cases in C{test} with C{decorator}.
1297
1298 C{test} can be a test case or a test suite. If it is a test suite, then the
1299 structure of the suite is preserved.
1300
1301 L{decorate} tries to preserve the class of the test suites it finds, but
1302 assumes the presence of the C{_tests} attribute on the suite.
1303
1304 @param test: The C{TestCase} or C{TestSuite} to decorate.
1305
1306 @param decorator: A unary callable used to decorate C{TestCase}s.
1307
1308 @return: A decorated C{TestCase} or a C{TestSuite} containing decorated
1309 C{TestCase}s.
1310 """
1311
1312 try:
1313 tests = iter(test)
1314 except TypeError:
1315 return decorator(test)
1316
1317 # At this point, we know that 'test' is a test suite.
1318 _clearSuite(test)
1319
1320 for case in tests:
1321 test.addTest(decorate(case, decorator))
1322 return test
1323
1324
1325
1326 class _PyUnitTestCaseAdapter(TestDecorator):
1327 """
1328 Adapt from pyunit.TestCase to ITestCase.
1329 """
1330
1331
1332 def visit(self, visitor):
1333 """
1334 Deprecated in Twisted 8.0.
1335 """
1336 warnings.warn("Test visitors deprecated in Twisted 8.0",
1337 category=DeprecationWarning)
1338 visitor(self)
1339
1340
1341
1342 class _BrokenIDTestCaseAdapter(_PyUnitTestCaseAdapter):
1343 """
1344 Adapter for pyunit-style C{TestCase} subclasses that have undesirable id()
1345 methods. That is L{pyunit.FunctionTestCase} and L{pyunit.DocTestCase}.
1346 """
1347
1348 def id(self):
1349 """
1350 Return the fully-qualified Python name of the doctest.
1351 """
1352 testID = self._originalTest.shortDescription()
1353 if testID is not None:
1354 return testID
1355 return self._originalTest.id()
1356
1357
1358
1359 class _ForceGarbageCollectionDecorator(TestDecorator):
1360 """
1361 Forces garbage collection to be run before and after the test. Any errors
1362 logged during the post-test collection are added to the test result as
1363 errors.
1364 """
1365
1366 def run(self, result):
1367 gc.collect()
1368 TestDecorator.run(self, result)
1369 _logObserver._add()
1370 gc.collect()
1371 for error in _logObserver.getErrors():
1372 result.addError(self, error)
1373 _logObserver.flushErrors()
1374 _logObserver._remove()
1375
1376
1377 components.registerAdapter(
1378 _PyUnitTestCaseAdapter, pyunit.TestCase, itrial.ITestCase)
1379
1380
1381 components.registerAdapter(
1382 _BrokenIDTestCaseAdapter, pyunit.FunctionTestCase, itrial.ITestCase)
1383
1384
1385 _docTestCase = getattr(doctest, 'DocTestCase', None)
1386 if _docTestCase:
1387 components.registerAdapter(
1388 _BrokenIDTestCaseAdapter, _docTestCase, itrial.ITestCase)
1389
1390
1391 def _iterateTests(testSuiteOrCase):
1392 """
1393 Iterate through all of the test cases in C{testSuiteOrCase}.
1394 """
1395 try:
1396 suite = iter(testSuiteOrCase)
1397 except TypeError:
1398 yield testSuiteOrCase
1399 else:
1400 for test in suite:
1401 for subtest in _iterateTests(test):
1402 yield subtest
1403
1404
1405
1406 # Support for Python 2.3
1407 try:
1408 iter(pyunit.TestSuite())
1409 except TypeError:
1410 # Python 2.3's TestSuite doesn't support iteration. Let's monkey patch it!
1411 def __iter__(self):
1412 return iter(self._tests)
1413 pyunit.TestSuite.__iter__ = __iter__
1414
1415
1416
1417 class _SubTestCase(TestCase):
1418 def __init__(self):
1419 TestCase.__init__(self, 'run')
1420
1421 _inst = _SubTestCase()
1422
1423 def _deprecate(name):
1424 """
1425 Internal method used to deprecate top-level assertions. Do not use this.
1426 """
1427 def _(*args, **kwargs):
1428 warnings.warn("unittest.%s is deprecated. Instead use the %r "
1429 "method on unittest.TestCase" % (name, name),
1430 stacklevel=2, category=DeprecationWarning)
1431 return getattr(_inst, name)(*args, **kwargs)
1432 return _
1433
1434
1435 _assertions = ['fail', 'failUnlessEqual', 'failIfEqual', 'failIfEquals',
1436 'failUnless', 'failUnlessIdentical', 'failUnlessIn',
1437 'failIfIdentical', 'failIfIn', 'failIf',
1438 'failUnlessAlmostEqual', 'failIfAlmostEqual',
1439 'failUnlessRaises', 'assertApproximates',
1440 'assertFailure', 'failUnlessSubstring', 'failIfSubstring',
1441 'assertAlmostEqual', 'assertAlmostEquals',
1442 'assertNotAlmostEqual', 'assertNotAlmostEquals', 'assertEqual',
1443 'assertEquals', 'assertNotEqual', 'assertNotEquals',
1444 'assertRaises', 'assert_', 'assertIdentical',
1445 'assertNotIdentical', 'assertIn', 'assertNotIn',
1446 'failUnlessFailure', 'assertSubstring', 'assertNotSubstring']
1447
1448
1449 for methodName in _assertions:
1450 globals()[methodName] = _deprecate(methodName)
1451
1452
1453 __all__ = ['TestCase', 'wait', 'FailTest', 'SkipTest']
1454
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/trial/test/weird.py ('k') | third_party/twisted_8_1/twisted/trial/util.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698