| OLD | NEW |
| (Empty) |
| 1 | |
| 2 from twisted.internet import reactor, defer, task | |
| 3 from twisted.trial import unittest | |
| 4 | |
| 5 | |
| 6 class TestCooperator(unittest.TestCase): | |
| 7 RESULT = 'done' | |
| 8 | |
| 9 def ebIter(self, err): | |
| 10 err.trap(task.SchedulerStopped) | |
| 11 return self.RESULT | |
| 12 | |
| 13 | |
| 14 def cbIter(self, ign): | |
| 15 self.fail() | |
| 16 | |
| 17 | |
| 18 def testStoppedRejectsNewTasks(self): | |
| 19 """ | |
| 20 Test that Cooperators refuse new tasks when they have been stopped. | |
| 21 """ | |
| 22 def testwith(stuff): | |
| 23 c = task.Cooperator() | |
| 24 c.stop() | |
| 25 d = c.coiterate(iter(()), stuff) | |
| 26 d.addCallback(self.cbIter) | |
| 27 d.addErrback(self.ebIter) | |
| 28 return d.addCallback(lambda result: | |
| 29 self.assertEquals(result, self.RESULT)) | |
| 30 return testwith(None).addCallback(lambda ign: testwith(defer.Deferred())
) | |
| 31 | |
| 32 | |
| 33 def testStopRunning(self): | |
| 34 """ | |
| 35 Test that a running iterator will not run to completion when the | |
| 36 cooperator is stopped. | |
| 37 """ | |
| 38 c = task.Cooperator() | |
| 39 def myiter(): | |
| 40 for myiter.value in range(3): | |
| 41 yield myiter.value | |
| 42 myiter.value = -1 | |
| 43 d = c.coiterate(myiter()) | |
| 44 d.addCallback(self.cbIter) | |
| 45 d.addErrback(self.ebIter) | |
| 46 c.stop() | |
| 47 def doasserts(result): | |
| 48 self.assertEquals(result, self.RESULT) | |
| 49 self.assertEquals(myiter.value, -1) | |
| 50 d.addCallback(doasserts) | |
| 51 return d | |
| 52 | |
| 53 | |
| 54 def testStopOutstanding(self): | |
| 55 """ | |
| 56 Test that a running iterator paused on a third-party Deferred will | |
| 57 properly stop when .stop() is called. | |
| 58 """ | |
| 59 testControlD = defer.Deferred() | |
| 60 outstandingD = defer.Deferred() | |
| 61 def myiter(): | |
| 62 reactor.callLater(0, testControlD.callback, None) | |
| 63 yield outstandingD | |
| 64 self.fail() | |
| 65 c = task.Cooperator() | |
| 66 d = c.coiterate(myiter()) | |
| 67 def stopAndGo(ign): | |
| 68 c.stop() | |
| 69 outstandingD.callback('arglebargle') | |
| 70 | |
| 71 testControlD.addCallback(stopAndGo) | |
| 72 d.addCallback(self.cbIter) | |
| 73 d.addErrback(self.ebIter) | |
| 74 | |
| 75 return d.addCallback(lambda result: self.assertEquals(result, self.RESUL
T)) | |
| 76 | |
| 77 | |
| 78 def testUnexpectedError(self): | |
| 79 c = task.Cooperator() | |
| 80 def myiter(): | |
| 81 if 0: | |
| 82 yield None | |
| 83 else: | |
| 84 raise RuntimeError() | |
| 85 d = c.coiterate(myiter()) | |
| 86 return self.assertFailure(d, RuntimeError) | |
| 87 | |
| 88 | |
| 89 def testUnexpectedErrorActuallyLater(self): | |
| 90 def myiter(): | |
| 91 D = defer.Deferred() | |
| 92 reactor.callLater(0, D.errback, RuntimeError()) | |
| 93 yield D | |
| 94 | |
| 95 c = task.Cooperator() | |
| 96 d = c.coiterate(myiter()) | |
| 97 return self.assertFailure(d, RuntimeError) | |
| 98 | |
| 99 | |
| 100 def testUnexpectedErrorNotActuallyLater(self): | |
| 101 def myiter(): | |
| 102 yield defer.fail(RuntimeError()) | |
| 103 | |
| 104 c = task.Cooperator() | |
| 105 d = c.coiterate(myiter()) | |
| 106 return self.assertFailure(d, RuntimeError) | |
| 107 | |
| 108 | |
| 109 def testCooperation(self): | |
| 110 L = [] | |
| 111 def myiter(things): | |
| 112 for th in things: | |
| 113 L.append(th) | |
| 114 yield None | |
| 115 | |
| 116 groupsOfThings = ['abc', (1, 2, 3), 'def', (4, 5, 6)] | |
| 117 | |
| 118 c = task.Cooperator() | |
| 119 tasks = [] | |
| 120 for stuff in groupsOfThings: | |
| 121 tasks.append(c.coiterate(myiter(stuff))) | |
| 122 | |
| 123 return defer.DeferredList(tasks).addCallback( | |
| 124 lambda ign: self.assertEquals(tuple(L), sum(zip(*groupsOfThings), ()
))) | |
| 125 | |
| 126 | |
| 127 def testResourceExhaustion(self): | |
| 128 output = [] | |
| 129 def myiter(): | |
| 130 for i in range(100): | |
| 131 output.append(i) | |
| 132 if i == 9: | |
| 133 _TPF.stopped = True | |
| 134 yield i | |
| 135 | |
| 136 class _TPF: | |
| 137 stopped = False | |
| 138 def __call__(self): | |
| 139 return self.stopped | |
| 140 | |
| 141 c = task.Cooperator(terminationPredicateFactory=_TPF) | |
| 142 c.coiterate(myiter()).addErrback(self.ebIter) | |
| 143 c._delayedCall.cancel() | |
| 144 # testing a private method because only the test case will ever care | |
| 145 # about this, so we have to carefully clean up after ourselves. | |
| 146 c._tick() | |
| 147 c.stop() | |
| 148 self.failUnless(_TPF.stopped) | |
| 149 self.assertEquals(output, range(10)) | |
| 150 | |
| 151 | |
| 152 def testCallbackReCoiterate(self): | |
| 153 """ | |
| 154 If a callback to a deferred returned by coiterate calls coiterate on | |
| 155 the same Cooperator, we should make sure to only do the minimal amount | |
| 156 of scheduling work. (This test was added to demonstrate a specific bug | |
| 157 that was found while writing the scheduler.) | |
| 158 """ | |
| 159 calls = [] | |
| 160 | |
| 161 class FakeCall: | |
| 162 def __init__(self, func): | |
| 163 self.func = func | |
| 164 | |
| 165 def __repr__(self): | |
| 166 return '<FakeCall %r>' % (self.func,) | |
| 167 | |
| 168 def sched(f): | |
| 169 self.failIf(calls, repr(calls)) | |
| 170 calls.append(FakeCall(f)) | |
| 171 return calls[-1] | |
| 172 | |
| 173 c = task.Cooperator(scheduler=sched, terminationPredicateFactory=lambda:
lambda: True) | |
| 174 d = c.coiterate(iter(())) | |
| 175 | |
| 176 done = [] | |
| 177 def anotherTask(ign): | |
| 178 c.coiterate(iter(())).addBoth(done.append) | |
| 179 | |
| 180 d.addCallback(anotherTask) | |
| 181 | |
| 182 work = 0 | |
| 183 while not done: | |
| 184 work += 1 | |
| 185 while calls: | |
| 186 calls.pop(0).func() | |
| 187 work += 1 | |
| 188 if work > 50: | |
| 189 self.fail("Cooperator took too long") | |
| OLD | NEW |