OLD | NEW |
| (Empty) |
1 # Copyright (c) 2005-2007 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 # | |
4 # Maintainer: Jonathan Lange <jml@twistedmatrix.com> | |
5 # Author: Robert Collins <robertc@robertcollins.net> | |
6 | |
7 | |
8 import StringIO | |
9 from zope.interface import implements | |
10 | |
11 from twisted.trial.itrial import IReporter, ITestCase | |
12 from twisted.trial import unittest, runner, reporter, util | |
13 from twisted.python import failure, log, reflect | |
14 from twisted.scripts import trial | |
15 from twisted.plugins import twisted_trial | |
16 from twisted import plugin | |
17 from twisted.internet import defer | |
18 | |
19 | |
20 pyunit = __import__('unittest') | |
21 | |
22 | |
23 class CapturingDebugger(object): | |
24 | |
25 def __init__(self): | |
26 self._calls = [] | |
27 | |
28 def runcall(self, *args, **kwargs): | |
29 self._calls.append('runcall') | |
30 args[0](*args[1:], **kwargs) | |
31 | |
32 | |
33 | |
34 class CapturingReporter(object): | |
35 """ | |
36 Reporter that keeps a log of all actions performed on it. | |
37 """ | |
38 | |
39 implements(IReporter) | |
40 | |
41 stream = None | |
42 tbformat = None | |
43 args = None | |
44 separator = None | |
45 testsRun = None | |
46 | |
47 def __init__(self, *a, **kw): | |
48 """ | |
49 Create a capturing reporter. | |
50 """ | |
51 self._calls = [] | |
52 self.shouldStop = False | |
53 | |
54 | |
55 def startTest(self, method): | |
56 """ | |
57 Report the beginning of a run of a single test method | |
58 @param method: an object that is adaptable to ITestMethod | |
59 """ | |
60 self._calls.append('startTest') | |
61 | |
62 | |
63 def stopTest(self, method): | |
64 """ | |
65 Report the status of a single test method | |
66 @param method: an object that is adaptable to ITestMethod | |
67 """ | |
68 self._calls.append('stopTest') | |
69 | |
70 | |
71 def cleanupErrors(self, errs): | |
72 """called when the reactor has been left in a 'dirty' state | |
73 @param errs: a list of L{twisted.python.failure.Failure}s | |
74 """ | |
75 self._calls.append('cleanupError') | |
76 | |
77 | |
78 def addSuccess(self, test): | |
79 self._calls.append('addSuccess') | |
80 | |
81 | |
82 def done(self): | |
83 """ | |
84 Do nothing. These tests don't care about done. | |
85 """ | |
86 | |
87 | |
88 | |
89 class TestTrialRunner(unittest.TestCase): | |
90 | |
91 def setUp(self): | |
92 self.stream = StringIO.StringIO() | |
93 self.runner = runner.TrialRunner(CapturingReporter, stream=self.stream) | |
94 self.test = TestTrialRunner('test_empty') | |
95 | |
96 def test_empty(self): | |
97 """ | |
98 Empty test method, used by the other tests. | |
99 """ | |
100 | |
101 def tearDown(self): | |
102 self.runner._tearDownLogFile() | |
103 | |
104 def _getObservers(self): | |
105 return log.theLogPublisher.observers | |
106 | |
107 def test_addObservers(self): | |
108 """ | |
109 Tests that the runner add logging observers during the run. | |
110 """ | |
111 originalCount = len(self._getObservers()) | |
112 self.runner.run(self.test) | |
113 newCount = len(self._getObservers()) | |
114 self.failUnlessEqual(originalCount + 1, newCount) | |
115 | |
116 def test_addObservers_repeat(self): | |
117 self.runner.run(self.test) | |
118 count = len(self._getObservers()) | |
119 self.runner.run(self.test) | |
120 newCount = len(self._getObservers()) | |
121 self.failUnlessEqual(count, newCount) | |
122 | |
123 def test_logFileAlwaysActive(self): | |
124 """ | |
125 Test that a new file is opened on each run. | |
126 """ | |
127 oldSetUpLogging = self.runner._setUpLogging | |
128 l = [] | |
129 def setUpLogging(): | |
130 oldSetUpLogging() | |
131 l.append(self.runner._logFileObserver) | |
132 self.runner._setUpLogging = setUpLogging | |
133 self.runner.run(self.test) | |
134 self.runner.run(self.test) | |
135 self.failUnlessEqual(len(l), 2) | |
136 self.failIf(l[0] is l[1], "Should have created a new file observer") | |
137 | |
138 def test_logFileGetsClosed(self): | |
139 """ | |
140 Test that file created is closed during the run. | |
141 """ | |
142 oldSetUpLogging = self.runner._setUpLogging | |
143 l = [] | |
144 def setUpLogging(): | |
145 oldSetUpLogging() | |
146 l.append(self.runner._logFileObject) | |
147 self.runner._setUpLogging = setUpLogging | |
148 self.runner.run(self.test) | |
149 self.failUnlessEqual(len(l), 1) | |
150 self.failUnless(l[0].closed) | |
151 | |
152 | |
153 | |
154 class TrialRunnerWithUncleanWarningsReporter(TestTrialRunner): | |
155 """ | |
156 Tests for the TrialRunner's interaction with an unclean-error suppressing | |
157 reporter. | |
158 """ | |
159 | |
160 def setUp(self): | |
161 self.stream = StringIO.StringIO() | |
162 self.runner = runner.TrialRunner(CapturingReporter, stream=self.stream, | |
163 uncleanWarnings=True) | |
164 self.test = TestTrialRunner('test_empty') | |
165 | |
166 | |
167 | |
168 class DryRunMixin(object): | |
169 | |
170 suppress = [util.suppress( | |
171 category=DeprecationWarning, | |
172 message="Test visitors deprecated in Twisted 8.0")] | |
173 | |
174 | |
175 def setUp(self): | |
176 self.log = [] | |
177 self.stream = StringIO.StringIO() | |
178 self.runner = runner.TrialRunner(CapturingReporter, | |
179 runner.TrialRunner.DRY_RUN, | |
180 stream=self.stream) | |
181 self.makeTestFixtures() | |
182 | |
183 | |
184 def makeTestFixtures(self): | |
185 """ | |
186 Set C{self.test} and C{self.suite}, where C{self.suite} is an empty | |
187 TestSuite. | |
188 """ | |
189 | |
190 | |
191 def test_empty(self): | |
192 """ | |
193 If there are no tests, the reporter should not receive any events to | |
194 report. | |
195 """ | |
196 result = self.runner.run(runner.TestSuite()) | |
197 self.assertEqual(result._calls, []) | |
198 | |
199 | |
200 def test_singleCaseReporting(self): | |
201 """ | |
202 If we are running a single test, check the reporter starts, passes and | |
203 then stops the test during a dry run. | |
204 """ | |
205 result = self.runner.run(self.test) | |
206 self.assertEqual(result._calls, ['startTest', 'addSuccess', 'stopTest']) | |
207 | |
208 | |
209 def test_testsNotRun(self): | |
210 """ | |
211 When we are doing a dry run, the tests should not actually be run. | |
212 """ | |
213 self.runner.run(self.test) | |
214 self.assertEqual(self.log, []) | |
215 | |
216 | |
217 | |
218 class DryRunTest(DryRunMixin, unittest.TestCase): | |
219 """ | |
220 Check that 'dry run' mode works well with Trial tests. | |
221 """ | |
222 def makeTestFixtures(self): | |
223 class MockTest(unittest.TestCase): | |
224 def test_foo(test): | |
225 self.log.append('test_foo') | |
226 self.test = MockTest('test_foo') | |
227 self.suite = runner.TestSuite() | |
228 | |
229 | |
230 | |
231 class PyUnitDryRunTest(DryRunMixin, unittest.TestCase): | |
232 """ | |
233 Check that 'dry run' mode works well with stdlib unittest tests. | |
234 """ | |
235 def makeTestFixtures(self): | |
236 class PyunitCase(pyunit.TestCase): | |
237 def test_foo(self): | |
238 pass | |
239 self.test = PyunitCase('test_foo') | |
240 self.suite = pyunit.TestSuite() | |
241 | |
242 | |
243 | |
244 class TestRunner(unittest.TestCase): | |
245 def setUp(self): | |
246 self.config = trial.Options() | |
247 # whitebox hack a reporter in, because plugins are CACHED and will | |
248 # only reload if the FILE gets changed. | |
249 | |
250 parts = reflect.qual(CapturingReporter).split('.') | |
251 package = '.'.join(parts[:-1]) | |
252 klass = parts[-1] | |
253 plugins = [twisted_trial._Reporter( | |
254 "Test Helper Reporter", | |
255 package, | |
256 description="Utility for unit testing.", | |
257 longOpt="capturing", | |
258 shortOpt=None, | |
259 klass=klass)] | |
260 | |
261 | |
262 # XXX There should really be a general way to hook the plugin system | |
263 # for tests. | |
264 def getPlugins(iface, *a, **kw): | |
265 self.assertEqual(iface, IReporter) | |
266 return plugins + list(self.original(iface, *a, **kw)) | |
267 | |
268 self.original = plugin.getPlugins | |
269 plugin.getPlugins = getPlugins | |
270 | |
271 self.standardReport = ['startTest', 'addSuccess', 'stopTest', | |
272 'startTest', 'addSuccess', 'stopTest', | |
273 'startTest', 'addSuccess', 'stopTest', | |
274 'startTest', 'addSuccess', 'stopTest', | |
275 'startTest', 'addSuccess', 'stopTest', | |
276 'startTest', 'addSuccess', 'stopTest', | |
277 'startTest', 'addSuccess', 'stopTest'] | |
278 | |
279 | |
280 def tearDown(self): | |
281 plugin.getPlugins = self.original | |
282 | |
283 | |
284 def parseOptions(self, args): | |
285 self.config.parseOptions(args) | |
286 | |
287 | |
288 def getRunner(self): | |
289 r = trial._makeRunner(self.config) | |
290 r.stream = StringIO.StringIO() | |
291 self.addCleanup(r._tearDownLogFile) | |
292 return r | |
293 | |
294 | |
295 def test_runner_can_get_reporter(self): | |
296 self.parseOptions([]) | |
297 result = self.config['reporter'] | |
298 my_runner = self.getRunner() | |
299 try: | |
300 self.assertEqual(result, my_runner._makeResult().__class__) | |
301 finally: | |
302 my_runner._tearDownLogFile() | |
303 | |
304 | |
305 def test_runner_get_result(self): | |
306 self.parseOptions([]) | |
307 my_runner = self.getRunner() | |
308 result = my_runner._makeResult() | |
309 self.assertEqual(result.__class__, self.config['reporter']) | |
310 | |
311 | |
312 def test_uncleanWarningsOffByDefault(self): | |
313 """ | |
314 By default Trial sets the 'uncleanWarnings' option on the runner to | |
315 False. This means that dirty reactor errors will be reported as | |
316 errors. See L{test_reporter.TestDirtyReactor}. | |
317 """ | |
318 self.parseOptions([]) | |
319 runner = self.getRunner() | |
320 self.assertNotIsInstance(runner._makeResult(), | |
321 reporter.UncleanWarningsReporterWrapper) | |
322 | |
323 | |
324 def test_getsUncleanWarnings(self): | |
325 """ | |
326 Specifying '--unclean-warnings' on the trial command line will cause | |
327 reporters to be wrapped in a device which converts unclean errors to | |
328 warnings. See L{test_reporter.TestDirtyReactor} for implications. | |
329 """ | |
330 self.parseOptions(['--unclean-warnings']) | |
331 runner = self.getRunner() | |
332 self.assertIsInstance(runner._makeResult(), | |
333 reporter.UncleanWarningsReporterWrapper) | |
334 | |
335 | |
336 def test_runner_working_directory(self): | |
337 self.parseOptions(['--temp-directory', 'some_path']) | |
338 runner = self.getRunner() | |
339 try: | |
340 self.assertEquals(runner.workingDirectory, 'some_path') | |
341 finally: | |
342 runner._tearDownLogFile() | |
343 | |
344 | |
345 def test_runner_normal(self): | |
346 self.parseOptions(['--temp-directory', self.mktemp(), | |
347 '--reporter', 'capturing', | |
348 'twisted.trial.test.sample']) | |
349 my_runner = self.getRunner() | |
350 loader = runner.TestLoader() | |
351 suite = loader.loadByName('twisted.trial.test.sample', True) | |
352 result = my_runner.run(suite) | |
353 self.assertEqual(self.standardReport, result._calls) | |
354 | |
355 | |
356 def test_runner_debug(self): | |
357 self.parseOptions(['--reporter', 'capturing', | |
358 '--debug', 'twisted.trial.test.sample']) | |
359 my_runner = self.getRunner() | |
360 debugger = CapturingDebugger() | |
361 def get_debugger(): | |
362 return debugger | |
363 my_runner._getDebugger = get_debugger | |
364 loader = runner.TestLoader() | |
365 suite = loader.loadByName('twisted.trial.test.sample', True) | |
366 result = my_runner.run(suite) | |
367 self.assertEqual(self.standardReport, result._calls) | |
368 self.assertEqual(['runcall'], debugger._calls) | |
369 | |
370 | |
371 | |
372 class TestTrialSuite(unittest.TestCase): | |
373 | |
374 def test_imports(self): | |
375 # FIXME, HTF do you test the reactor can be cleaned up ?!!! | |
376 from twisted.trial.runner import TrialSuite | |
377 # silence pyflakes warning | |
378 silencePyflakes = TrialSuite | |
379 | |
380 | |
381 | |
382 class TestUntilFailure(unittest.TestCase): | |
383 class FailAfter(unittest.TestCase): | |
384 """ | |
385 A test case that fails when run 3 times in a row. | |
386 """ | |
387 count = [] | |
388 def test_foo(self): | |
389 self.count.append(None) | |
390 if len(self.count) == 3: | |
391 self.fail('Count reached 3') | |
392 | |
393 def setUp(self): | |
394 TestUntilFailure.FailAfter.count = [] | |
395 self.test = TestUntilFailure.FailAfter('test_foo') | |
396 self.stream = StringIO.StringIO() | |
397 self.runner = runner.TrialRunner(reporter.Reporter, stream=self.stream) | |
398 | |
399 def test_runUntilFailure(self): | |
400 """ | |
401 Test that the runUntilFailure method of the runner actually fail after | |
402 a few runs. | |
403 """ | |
404 result = self.runner.runUntilFailure(self.test) | |
405 self.failUnlessEqual(result.testsRun, 1) | |
406 self.failIf(result.wasSuccessful()) | |
407 self.assertEquals(self._getFailures(result), 1) | |
408 | |
409 def _getFailures(self, result): | |
410 """ | |
411 Get the number of failures that were reported to a result. | |
412 """ | |
413 return len(result.failures) | |
414 | |
415 | |
416 | |
417 class UncleanUntilFailureTests(TestUntilFailure): | |
418 """ | |
419 Test that the run-until-failure feature works correctly with the unclean | |
420 error suppressor. | |
421 """ | |
422 | |
423 def setUp(self): | |
424 TestUntilFailure.setUp(self) | |
425 self.runner = runner.TrialRunner(reporter.Reporter, stream=self.stream, | |
426 uncleanWarnings=True) | |
427 | |
428 def _getFailures(self, result): | |
429 """ | |
430 Get the number of failures that were reported to a result that | |
431 is wrapped in an UncleanFailureWrapper. | |
432 """ | |
433 return len(result._originalReporter.failures) | |
434 | |
435 | |
436 | |
437 class BreakingSuite(runner.TestSuite): | |
438 """ | |
439 A L{TestSuite} that logs an error when it is run. | |
440 """ | |
441 | |
442 def run(self, result): | |
443 try: | |
444 raise RuntimeError("error that occurs outside of a test") | |
445 except RuntimeError, e: | |
446 log.err(failure.Failure()) | |
447 | |
448 | |
449 | |
450 class TestLoggedErrors(unittest.TestCase): | |
451 """ | |
452 It is possible for an error generated by a test to be logged I{outside} of | |
453 any test. The log observers constructed by L{TestCase} won't catch these | |
454 errors. Here we try to generate such errors and ensure they are reported to | |
455 a L{TestResult} object. | |
456 """ | |
457 | |
458 def tearDown(self): | |
459 self.flushLoggedErrors(RuntimeError) | |
460 | |
461 | |
462 def test_construct(self): | |
463 """ | |
464 Check that we can construct a L{runner.LoggedSuite} and that it | |
465 starts empty. | |
466 """ | |
467 suite = runner.LoggedSuite() | |
468 self.assertEqual(suite.countTestCases(), 0) | |
469 | |
470 | |
471 def test_capturesError(self): | |
472 """ | |
473 Chek that a L{LoggedSuite} reports any logged errors to its result. | |
474 """ | |
475 result = reporter.TestResult() | |
476 suite = runner.LoggedSuite([BreakingSuite()]) | |
477 suite.run(result) | |
478 self.assertEqual(len(result.errors), 1) | |
479 self.assertEqual(result.errors[0][0].id(), runner.NOT_IN_TEST) | |
480 self.failUnless(result.errors[0][1].check(RuntimeError)) | |
481 | |
482 | |
483 | |
484 class TestTestHolder(unittest.TestCase): | |
485 | |
486 def setUp(self): | |
487 self.description = "description" | |
488 self.holder = runner.TestHolder(self.description) | |
489 | |
490 | |
491 def test_holder(self): | |
492 """ | |
493 Check that L{runner.TestHolder} takes a description as a parameter | |
494 and that this description is returned by the C{id} and | |
495 C{shortDescription} methods. | |
496 """ | |
497 self.assertEqual(self.holder.id(), self.description) | |
498 self.assertEqual(self.holder.shortDescription(), self.description) | |
499 | |
500 | |
501 def test_holderImplementsITestCase(self): | |
502 """ | |
503 L{runner.TestHolder} implements L{ITestCase}. | |
504 """ | |
505 self.assertIdentical(self.holder, ITestCase(self.holder)) | |
506 | |
507 | |
508 | |
509 class TestErrorHolder(TestTestHolder): | |
510 """ | |
511 Test L{runner.ErrorHolder} shares behaviour with L{runner.TestHolder}. | |
512 """ | |
513 | |
514 def setUp(self): | |
515 self.description = "description" | |
516 # make a real Failure so we can construct ErrorHolder() | |
517 try: | |
518 1/0 | |
519 except ZeroDivisionError: | |
520 error = failure.Failure() | |
521 self.holder = runner.ErrorHolder(self.description, error) | |
522 | |
523 | |
524 | |
525 class TestMalformedMethod(unittest.TestCase): | |
526 """ | |
527 Test that trial manages when test methods don't have correct signatures. | |
528 """ | |
529 class ContainMalformed(unittest.TestCase): | |
530 """ | |
531 This TestCase holds malformed test methods that trial should handle. | |
532 """ | |
533 def test_foo(self, blah): | |
534 pass | |
535 def test_bar(): | |
536 pass | |
537 test_spam = defer.deferredGenerator(test_bar) | |
538 | |
539 def _test(self, method): | |
540 """ | |
541 Wrapper for one of the test method of L{ContainMalformed}. | |
542 """ | |
543 stream = StringIO.StringIO() | |
544 trialRunner = runner.TrialRunner(reporter.Reporter, stream=stream) | |
545 test = TestMalformedMethod.ContainMalformed(method) | |
546 result = trialRunner.run(test) | |
547 self.failUnlessEqual(result.testsRun, 1) | |
548 self.failIf(result.wasSuccessful()) | |
549 self.failUnlessEqual(len(result.errors), 1) | |
550 | |
551 def test_extraArg(self): | |
552 """ | |
553 Test when the method has extra (useless) arguments. | |
554 """ | |
555 self._test('test_foo') | |
556 | |
557 def test_noArg(self): | |
558 """ | |
559 Test when the method doesn't have even self as argument. | |
560 """ | |
561 self._test('test_bar') | |
562 | |
563 def test_decorated(self): | |
564 """ | |
565 Test a decorated method also fails. | |
566 """ | |
567 self._test('test_spam') | |
568 | |
569 | |
570 | |
571 class DestructiveTestSuiteTestCase(unittest.TestCase): | |
572 """ | |
573 Test for L{runner.DestructiveTestSuite}. | |
574 """ | |
575 | |
576 def test_basic(self): | |
577 """ | |
578 Thes destructive test suite should run the tests normally. | |
579 """ | |
580 called = [] | |
581 class MockTest(unittest.TestCase): | |
582 def test_foo(test): | |
583 called.append(True) | |
584 test = MockTest('test_foo') | |
585 result = reporter.TestResult() | |
586 suite = runner.DestructiveTestSuite([test]) | |
587 self.assertEquals(called, []) | |
588 suite.run(result) | |
589 self.assertEquals(called, [True]) | |
590 self.assertEquals(suite.countTestCases(), 0) | |
591 | |
592 | |
593 def test_shouldStop(self): | |
594 """ | |
595 Test the C{shouldStop} management: raising a C{KeyboardInterrupt} must | |
596 interrupt the suite. | |
597 """ | |
598 called = [] | |
599 class MockTest(unittest.TestCase): | |
600 def test_foo1(test): | |
601 called.append(1) | |
602 def test_foo2(test): | |
603 raise KeyboardInterrupt() | |
604 def test_foo3(test): | |
605 called.append(2) | |
606 result = reporter.TestResult() | |
607 loader = runner.TestLoader() | |
608 loader.suiteFactory = runner.DestructiveTestSuite | |
609 suite = loader.loadClass(MockTest) | |
610 self.assertEquals(called, []) | |
611 suite.run(result) | |
612 self.assertEquals(called, [1]) | |
613 # The last test shouldn't have been run | |
614 self.assertEquals(suite.countTestCases(), 1) | |
615 | |
616 | |
617 def test_cleanup(self): | |
618 """ | |
619 Checks that the test suite cleanups its tests during the run, so that | |
620 it ends empty. | |
621 """ | |
622 class MockTest(unittest.TestCase): | |
623 def test_foo(test): | |
624 pass | |
625 test = MockTest('test_foo') | |
626 result = reporter.TestResult() | |
627 suite = runner.DestructiveTestSuite([test]) | |
628 self.assertEquals(suite.countTestCases(), 1) | |
629 suite.run(result) | |
630 self.assertEquals(suite.countTestCases(), 0) | |
631 | |
632 | |
633 | |
634 class TestRunnerDeprecation(unittest.TestCase): | |
635 | |
636 class FakeReporter(reporter.Reporter): | |
637 """ | |
638 Fake reporter that does *not* implement done() but *does* implement | |
639 printErrors, separator, printSummary, stream, write and writeln | |
640 without deprecations. | |
641 """ | |
642 | |
643 done = None | |
644 separator = None | |
645 stream = None | |
646 | |
647 def printErrors(self, *args): | |
648 pass | |
649 | |
650 def printSummary(self, *args): | |
651 pass | |
652 | |
653 def write(self, *args): | |
654 pass | |
655 | |
656 def writeln(self, *args): | |
657 pass | |
658 | |
659 | |
660 def test_reporterDeprecations(self): | |
661 """ | |
662 The runner emits a warning if it is using a result that doesn't | |
663 implement 'done'. | |
664 """ | |
665 trialRunner = runner.TrialRunner(None) | |
666 result = self.FakeReporter() | |
667 trialRunner._makeResult = lambda: result | |
668 def f(): | |
669 # We have to use a pyunit test, otherwise we'll get deprecation | |
670 # warnings about using iterate() in a test. | |
671 trialRunner.run(pyunit.TestCase('id')) | |
672 self.assertWarns( | |
673 DeprecationWarning, | |
674 "%s should implement done() but doesn't. Falling back to " | |
675 "printErrors() and friends." % reflect.qual(result.__class__), | |
676 __file__, f) | |
OLD | NEW |