Index: third_party/twisted_8_1/twisted/trial/test/test_util.py |
diff --git a/third_party/twisted_8_1/twisted/trial/test/test_util.py b/third_party/twisted_8_1/twisted/trial/test/test_util.py |
deleted file mode 100644 |
index 6357d4c9073562539f911c7e8d26a66f5e9a27db..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/trial/test/test_util.py |
+++ /dev/null |
@@ -1,533 +0,0 @@ |
-import os |
- |
-from zope.interface import implements |
- |
-from twisted.internet.interfaces import IProcessTransport |
-from twisted.internet import defer |
-from twisted.internet.base import DelayedCall |
- |
-from twisted.trial.unittest import TestCase |
-from twisted.trial import util |
-from twisted.trial.util import DirtyReactorAggregateError, _Janitor |
-from twisted.trial.test import packages |
- |
- |
- |
-class TestMktemp(TestCase): |
- def test_name(self): |
- name = self.mktemp() |
- dirs = os.path.dirname(name).split(os.sep)[:-1] |
- self.failUnlessEqual( |
- dirs, ['twisted.trial.test.test_util', 'TestMktemp', 'test_name']) |
- |
- def test_unique(self): |
- name = self.mktemp() |
- self.failIfEqual(name, self.mktemp()) |
- |
- def test_created(self): |
- name = self.mktemp() |
- dirname = os.path.dirname(name) |
- self.failUnless(os.path.exists(dirname)) |
- self.failIf(os.path.exists(name)) |
- |
- def test_location(self): |
- path = os.path.abspath(self.mktemp()) |
- self.failUnless(path.startswith(os.getcwd())) |
- |
- |
-class TestIntrospection(TestCase): |
- def test_containers(self): |
- import suppression |
- parents = util.getPythonContainers( |
- suppression.TestSuppression2.testSuppressModule) |
- expected = [suppression.TestSuppression2, suppression] |
- for a, b in zip(parents, expected): |
- self.failUnlessEqual(a, b) |
- |
- |
-class TestFindObject(packages.SysPathManglingTest): |
- def test_importPackage(self): |
- package1 = util.findObject('package') |
- import package as package2 |
- self.failUnlessEqual(package1, (True, package2)) |
- |
- def test_importModule(self): |
- test_sample2 = util.findObject('goodpackage.test_sample') |
- from goodpackage import test_sample |
- self.failUnlessEqual((True, test_sample), test_sample2) |
- |
- def test_importError(self): |
- self.failUnlessRaises(ZeroDivisionError, |
- util.findObject, 'package.test_bad_module') |
- |
- def test_sophisticatedImportError(self): |
- self.failUnlessRaises(ImportError, |
- util.findObject, 'package2.test_module') |
- |
- def test_importNonexistentPackage(self): |
- self.failUnlessEqual(util.findObject('doesntexist')[0], False) |
- |
- def test_findNonexistentModule(self): |
- self.failUnlessEqual(util.findObject('package.doesntexist')[0], False) |
- |
- def test_findNonexistentObject(self): |
- self.failUnlessEqual(util.findObject( |
- 'goodpackage.test_sample.doesnt')[0], False) |
- self.failUnlessEqual(util.findObject( |
- 'goodpackage.test_sample.AlphabetTest.doesntexist')[0], False) |
- |
- def test_findObjectExist(self): |
- alpha1 = util.findObject('goodpackage.test_sample.AlphabetTest') |
- from goodpackage import test_sample |
- self.failUnlessEqual(alpha1, (True, test_sample.AlphabetTest)) |
- |
- |
- |
-class TestRunSequentially(TestCase): |
- """ |
- Sometimes it is useful to be able to run an arbitrary list of callables, |
- one after the other. |
- |
- When some of those callables can return Deferreds, things become complex. |
- """ |
- |
- def test_emptyList(self): |
- """ |
- When asked to run an empty list of callables, runSequentially returns a |
- successful Deferred that fires an empty list. |
- """ |
- d = util._runSequentially([]) |
- d.addCallback(self.assertEqual, []) |
- return d |
- |
- |
- def test_singleSynchronousSuccess(self): |
- """ |
- When given a callable that succeeds without returning a Deferred, |
- include the return value in the results list, tagged with a SUCCESS |
- flag. |
- """ |
- d = util._runSequentially([lambda: None]) |
- d.addCallback(self.assertEqual, [(defer.SUCCESS, None)]) |
- return d |
- |
- |
- def test_singleSynchronousFailure(self): |
- """ |
- When given a callable that raises an exception, include a Failure for |
- that exception in the results list, tagged with a FAILURE flag. |
- """ |
- d = util._runSequentially([lambda: self.fail('foo')]) |
- def check(results): |
- [(flag, fail)] = results |
- fail.trap(self.failureException) |
- self.assertEqual(fail.getErrorMessage(), 'foo') |
- self.assertEqual(flag, defer.FAILURE) |
- return d.addCallback(check) |
- |
- |
- def test_singleAsynchronousSuccess(self): |
- """ |
- When given a callable that returns a successful Deferred, include the |
- result of the Deferred in the results list, tagged with a SUCCESS flag. |
- """ |
- d = util._runSequentially([lambda: defer.succeed(None)]) |
- d.addCallback(self.assertEqual, [(defer.SUCCESS, None)]) |
- return d |
- |
- |
- def test_singleAsynchronousFailure(self): |
- """ |
- When given a callable that returns a failing Deferred, include the |
- failure the results list, tagged with a FAILURE flag. |
- """ |
- d = util._runSequentially([lambda: defer.fail(ValueError('foo'))]) |
- def check(results): |
- [(flag, fail)] = results |
- fail.trap(ValueError) |
- self.assertEqual(fail.getErrorMessage(), 'foo') |
- self.assertEqual(flag, defer.FAILURE) |
- return d.addCallback(check) |
- |
- |
- def test_callablesCalledInOrder(self): |
- """ |
- Check that the callables are called in the given order, one after the |
- other. |
- """ |
- log = [] |
- deferreds = [] |
- |
- def append(value): |
- d = defer.Deferred() |
- log.append(value) |
- deferreds.append(d) |
- return d |
- |
- d = util._runSequentially([lambda: append('foo'), |
- lambda: append('bar')]) |
- |
- # runSequentially should wait until the Deferred has fired before |
- # running the second callable. |
- self.assertEqual(log, ['foo']) |
- deferreds[-1].callback(None) |
- self.assertEqual(log, ['foo', 'bar']) |
- |
- # Because returning created Deferreds makes jml happy. |
- deferreds[-1].callback(None) |
- return d |
- |
- |
- def test_continuesAfterError(self): |
- """ |
- If one of the callables raises an error, then runSequentially continues |
- to run the remaining callables. |
- """ |
- d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar']) |
- def check(results): |
- [(flag1, fail), (flag2, result)] = results |
- fail.trap(self.failureException) |
- self.assertEqual(flag1, defer.FAILURE) |
- self.assertEqual(fail.getErrorMessage(), 'foo') |
- self.assertEqual(flag2, defer.SUCCESS) |
- self.assertEqual(result, 'bar') |
- return d.addCallback(check) |
- |
- |
- def test_stopOnFirstError(self): |
- """ |
- If the C{stopOnFirstError} option is passed to C{runSequentially}, then |
- no further callables are called after the first exception is raised. |
- """ |
- d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'], |
- stopOnFirstError=True) |
- def check(results): |
- [(flag1, fail)] = results |
- fail.trap(self.failureException) |
- self.assertEqual(flag1, defer.FAILURE) |
- self.assertEqual(fail.getErrorMessage(), 'foo') |
- return d.addCallback(check) |
- |
- |
- def test_stripFlags(self): |
- """ |
- If the C{stripFlags} option is passed to C{runSequentially} then the |
- SUCCESS / FAILURE flags are stripped from the output. Instead, the |
- Deferred fires a flat list of results containing only the results and |
- failures. |
- """ |
- d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'], |
- stripFlags=True) |
- def check(results): |
- [fail, result] = results |
- fail.trap(self.failureException) |
- self.assertEqual(fail.getErrorMessage(), 'foo') |
- self.assertEqual(result, 'bar') |
- return d.addCallback(check) |
- test_stripFlags.todo = "YAGNI" |
- |
- |
- |
-class DirtyReactorAggregateErrorTest(TestCase): |
- """ |
- Tests for the L{DirtyReactorAggregateError}. |
- """ |
- |
- def test_formatDelayedCall(self): |
- """ |
- Delayed calls are formatted nicely. |
- """ |
- error = DirtyReactorAggregateError(["Foo", "bar"]) |
- self.assertEquals(str(error), |
- """\ |
-Reactor was unclean. |
-DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug) |
-Foo |
-bar""") |
- |
- |
- def test_formatSelectables(self): |
- """ |
- Selectables are formatted nicely. |
- """ |
- error = DirtyReactorAggregateError([], ["selectable 1", "selectable 2"]) |
- self.assertEquals(str(error), |
- """\ |
-Reactor was unclean. |
-Selectables: |
-selectable 1 |
-selectable 2""") |
- |
- |
- def test_formatDelayedCallsAndSelectables(self): |
- """ |
- Both delayed calls and selectables can appear in the same error. |
- """ |
- error = DirtyReactorAggregateError(["bleck", "Boozo"], |
- ["Sel1", "Sel2"]) |
- self.assertEquals(str(error), |
- """\ |
-Reactor was unclean. |
-DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug) |
-bleck |
-Boozo |
-Selectables: |
-Sel1 |
-Sel2""") |
- |
- |
- |
-class StubReactor(object): |
- """ |
- A reactor stub which contains enough functionality to be used with the |
- L{_Janitor}. |
- |
- @ivar iterations: A list of the arguments passed to L{iterate}. |
- @ivar removeAllCalled: Number of times that L{removeAll} was called. |
- @ivar selectables: The value that will be returned from L{removeAll}. |
- @ivar delayedCalls: The value to return from L{getDelayedCalls}. |
- """ |
- |
- def __init__(self, delayedCalls, selectables=None): |
- """ |
- @param delayedCalls: See L{StubReactor.delayedCalls}. |
- @param selectables: See L{StubReactor.selectables}. |
- """ |
- self.delayedCalls = delayedCalls |
- self.iterations = [] |
- self.removeAllCalled = 0 |
- if not selectables: |
- selectables = [] |
- self.selectables = selectables |
- |
- |
- def iterate(self, timeout=None): |
- """ |
- Increment C{self.iterations}. |
- """ |
- self.iterations.append(timeout) |
- |
- |
- def getDelayedCalls(self): |
- """ |
- Return C{self.delayedCalls}. |
- """ |
- return self.delayedCalls |
- |
- |
- def removeAll(self): |
- """ |
- Increment C{self.removeAllCalled} and return C{self.selectables}. |
- """ |
- self.removeAllCalled += 1 |
- return self.selectables |
- |
- |
- |
-class StubErrorReporter(object): |
- """ |
- A subset of L{twisted.trial.itrial.IReporter} which records L{addError} |
- calls. |
- |
- @ivar errors: List of two-tuples of (test, error) which were passed to |
- L{addError}. |
- """ |
- |
- def __init__(self): |
- self.errors = [] |
- |
- |
- def addError(self, test, error): |
- """ |
- Record parameters in C{self.errors}. |
- """ |
- self.errors.append((test, error)) |
- |
- |
- |
-class JanitorTests(TestCase): |
- """ |
- Tests for L{_Janitor}! |
- """ |
- |
- def test_cleanPendingSpinsReactor(self): |
- """ |
- During pending-call cleanup, the reactor will be spun twice with an |
- instant timeout. This is not a requirement, it is only a test for |
- current behavior. Hopefully Trial will eventually not do this kind of |
- reactor stuff. |
- """ |
- reactor = StubReactor([]) |
- jan = _Janitor(None, None, reactor=reactor) |
- jan._cleanPending() |
- self.assertEquals(reactor.iterations, [0, 0]) |
- |
- |
- def test_cleanPendingCancelsCalls(self): |
- """ |
- During pending-call cleanup, the janitor cancels pending timed calls. |
- """ |
- def func(): |
- return "Lulz" |
- cancelled = [] |
- delayedCall = DelayedCall(300, func, (), {}, |
- cancelled.append, lambda x: None) |
- reactor = StubReactor([delayedCall]) |
- jan = _Janitor(None, None, reactor=reactor) |
- jan._cleanPending() |
- self.assertEquals(cancelled, [delayedCall]) |
- |
- |
- def test_cleanPendingReturnsDelayedCallStrings(self): |
- """ |
- The Janitor produces string representations of delayed calls from the |
- delayed call cleanup method. It gets the string representations |
- *before* cancelling the calls; this is important because cancelling the |
- call removes critical debugging information from the string |
- representation. |
- """ |
- delayedCall = DelayedCall(300, lambda: None, (), {}, |
- lambda x: None, lambda x: None, |
- seconds=lambda: 0) |
- delayedCallString = str(delayedCall) |
- reactor = StubReactor([delayedCall]) |
- jan = _Janitor(None, None, reactor=reactor) |
- strings = jan._cleanPending() |
- self.assertEquals(strings, [delayedCallString]) |
- |
- |
- def test_cleanReactorRemovesSelectables(self): |
- """ |
- The Janitor will remove selectables during reactor cleanup. |
- """ |
- reactor = StubReactor([]) |
- jan = _Janitor(None, None, reactor=reactor) |
- jan._cleanReactor() |
- self.assertEquals(reactor.removeAllCalled, 1) |
- |
- |
- def test_cleanReactorKillsProcesses(self): |
- """ |
- The Janitor will kill processes during reactor cleanup. |
- """ |
- class StubProcessTransport(object): |
- """ |
- A stub L{IProcessTransport} provider which records signals. |
- @ivar signals: The signals passed to L{signalProcess}. |
- """ |
- implements(IProcessTransport) |
- |
- def __init__(self): |
- self.signals = [] |
- |
- def signalProcess(self, signal): |
- """ |
- Append C{signal} to C{self.signals}. |
- """ |
- self.signals.append(signal) |
- |
- pt = StubProcessTransport() |
- reactor = StubReactor([], [pt]) |
- jan = _Janitor(None, None, reactor=reactor) |
- jan._cleanReactor() |
- self.assertEquals(pt.signals, ["KILL"]) |
- |
- |
- def test_cleanReactorReturnsSelectableStrings(self): |
- """ |
- The Janitor returns string representations of the selectables that it |
- cleaned up from the reactor cleanup method. |
- """ |
- class Selectable(object): |
- """ |
- A stub Selectable which only has an interesting string |
- representation. |
- """ |
- def __repr__(self): |
- return "(SELECTABLE!)" |
- |
- reactor = StubReactor([], [Selectable()]) |
- jan = _Janitor(None, None, reactor=reactor) |
- self.assertEquals(jan._cleanReactor(), ["(SELECTABLE!)"]) |
- |
- |
- def test_postCaseCleanupNoErrors(self): |
- """ |
- The post-case cleanup method will return True and not call C{addError} |
- on the result if there are no pending calls. |
- """ |
- reactor = StubReactor([]) |
- test = object() |
- reporter = StubErrorReporter() |
- jan = _Janitor(test, reporter, reactor=reactor) |
- self.assertTrue(jan.postCaseCleanup()) |
- self.assertEquals(reporter.errors, []) |
- |
- |
- def test_postCaseCleanupWithErrors(self): |
- """ |
- The post-case cleanup method will return False and call C{addError} on |
- the result with a L{DirtyReactorAggregateError} Failure if there are |
- pending calls. |
- """ |
- delayedCall = DelayedCall(300, lambda: None, (), {}, |
- lambda x: None, lambda x: None, |
- seconds=lambda: 0) |
- delayedCallString = str(delayedCall) |
- reactor = StubReactor([delayedCall], []) |
- test = object() |
- reporter = StubErrorReporter() |
- jan = _Janitor(test, reporter, reactor=reactor) |
- self.assertFalse(jan.postCaseCleanup()) |
- self.assertEquals(len(reporter.errors), 1) |
- self.assertEquals(reporter.errors[0][1].value.delayedCalls, |
- [delayedCallString]) |
- |
- |
- def test_postClassCleanupNoErrors(self): |
- """ |
- The post-class cleanup method will not call C{addError} on the result |
- if there are no pending calls or selectables. |
- """ |
- reactor = StubReactor([]) |
- test = object() |
- reporter = StubErrorReporter() |
- jan = _Janitor(test, reporter, reactor=reactor) |
- jan.postClassCleanup() |
- self.assertEquals(reporter.errors, []) |
- |
- |
- def test_postClassCleanupWithPendingCallErrors(self): |
- """ |
- The post-class cleanup method call C{addError} on the result with a |
- L{DirtyReactorAggregateError} Failure if there are pending calls. |
- """ |
- delayedCall = DelayedCall(300, lambda: None, (), {}, |
- lambda x: None, lambda x: None, |
- seconds=lambda: 0) |
- delayedCallString = str(delayedCall) |
- reactor = StubReactor([delayedCall], []) |
- test = object() |
- reporter = StubErrorReporter() |
- jan = _Janitor(test, reporter, reactor=reactor) |
- jan.postClassCleanup() |
- self.assertEquals(len(reporter.errors), 1) |
- self.assertEquals(reporter.errors[0][1].value.delayedCalls, |
- [delayedCallString]) |
- |
- |
- def test_postClassCleanupWithSelectableErrors(self): |
- """ |
- The post-class cleanup method call C{addError} on the result with a |
- L{DirtyReactorAggregateError} Failure if there are selectables. |
- """ |
- selectable = "SELECTABLE HERE" |
- reactor = StubReactor([], [selectable]) |
- test = object() |
- reporter = StubErrorReporter() |
- jan = _Janitor(test, reporter, reactor=reactor) |
- jan.postClassCleanup() |
- self.assertEquals(len(reporter.errors), 1) |
- self.assertEquals(reporter.errors[0][1].value.selectables, |
- [repr(selectable)]) |
- |