OLD | NEW |
| (Empty) |
1 | |
2 from twisted.internet import reactor, defer, task | |
3 from twisted.trial import unittest | |
4 | |
5 | |
6 class TestCooperator(unittest.TestCase): | |
7 RESULT = 'done' | |
8 | |
9 def ebIter(self, err): | |
10 err.trap(task.SchedulerStopped) | |
11 return self.RESULT | |
12 | |
13 | |
14 def cbIter(self, ign): | |
15 self.fail() | |
16 | |
17 | |
18 def testStoppedRejectsNewTasks(self): | |
19 """ | |
20 Test that Cooperators refuse new tasks when they have been stopped. | |
21 """ | |
22 def testwith(stuff): | |
23 c = task.Cooperator() | |
24 c.stop() | |
25 d = c.coiterate(iter(()), stuff) | |
26 d.addCallback(self.cbIter) | |
27 d.addErrback(self.ebIter) | |
28 return d.addCallback(lambda result: | |
29 self.assertEquals(result, self.RESULT)) | |
30 return testwith(None).addCallback(lambda ign: testwith(defer.Deferred())
) | |
31 | |
32 | |
33 def testStopRunning(self): | |
34 """ | |
35 Test that a running iterator will not run to completion when the | |
36 cooperator is stopped. | |
37 """ | |
38 c = task.Cooperator() | |
39 def myiter(): | |
40 for myiter.value in range(3): | |
41 yield myiter.value | |
42 myiter.value = -1 | |
43 d = c.coiterate(myiter()) | |
44 d.addCallback(self.cbIter) | |
45 d.addErrback(self.ebIter) | |
46 c.stop() | |
47 def doasserts(result): | |
48 self.assertEquals(result, self.RESULT) | |
49 self.assertEquals(myiter.value, -1) | |
50 d.addCallback(doasserts) | |
51 return d | |
52 | |
53 | |
54 def testStopOutstanding(self): | |
55 """ | |
56 Test that a running iterator paused on a third-party Deferred will | |
57 properly stop when .stop() is called. | |
58 """ | |
59 testControlD = defer.Deferred() | |
60 outstandingD = defer.Deferred() | |
61 def myiter(): | |
62 reactor.callLater(0, testControlD.callback, None) | |
63 yield outstandingD | |
64 self.fail() | |
65 c = task.Cooperator() | |
66 d = c.coiterate(myiter()) | |
67 def stopAndGo(ign): | |
68 c.stop() | |
69 outstandingD.callback('arglebargle') | |
70 | |
71 testControlD.addCallback(stopAndGo) | |
72 d.addCallback(self.cbIter) | |
73 d.addErrback(self.ebIter) | |
74 | |
75 return d.addCallback(lambda result: self.assertEquals(result, self.RESUL
T)) | |
76 | |
77 | |
78 def testUnexpectedError(self): | |
79 c = task.Cooperator() | |
80 def myiter(): | |
81 if 0: | |
82 yield None | |
83 else: | |
84 raise RuntimeError() | |
85 d = c.coiterate(myiter()) | |
86 return self.assertFailure(d, RuntimeError) | |
87 | |
88 | |
89 def testUnexpectedErrorActuallyLater(self): | |
90 def myiter(): | |
91 D = defer.Deferred() | |
92 reactor.callLater(0, D.errback, RuntimeError()) | |
93 yield D | |
94 | |
95 c = task.Cooperator() | |
96 d = c.coiterate(myiter()) | |
97 return self.assertFailure(d, RuntimeError) | |
98 | |
99 | |
100 def testUnexpectedErrorNotActuallyLater(self): | |
101 def myiter(): | |
102 yield defer.fail(RuntimeError()) | |
103 | |
104 c = task.Cooperator() | |
105 d = c.coiterate(myiter()) | |
106 return self.assertFailure(d, RuntimeError) | |
107 | |
108 | |
109 def testCooperation(self): | |
110 L = [] | |
111 def myiter(things): | |
112 for th in things: | |
113 L.append(th) | |
114 yield None | |
115 | |
116 groupsOfThings = ['abc', (1, 2, 3), 'def', (4, 5, 6)] | |
117 | |
118 c = task.Cooperator() | |
119 tasks = [] | |
120 for stuff in groupsOfThings: | |
121 tasks.append(c.coiterate(myiter(stuff))) | |
122 | |
123 return defer.DeferredList(tasks).addCallback( | |
124 lambda ign: self.assertEquals(tuple(L), sum(zip(*groupsOfThings), ()
))) | |
125 | |
126 | |
127 def testResourceExhaustion(self): | |
128 output = [] | |
129 def myiter(): | |
130 for i in range(100): | |
131 output.append(i) | |
132 if i == 9: | |
133 _TPF.stopped = True | |
134 yield i | |
135 | |
136 class _TPF: | |
137 stopped = False | |
138 def __call__(self): | |
139 return self.stopped | |
140 | |
141 c = task.Cooperator(terminationPredicateFactory=_TPF) | |
142 c.coiterate(myiter()).addErrback(self.ebIter) | |
143 c._delayedCall.cancel() | |
144 # testing a private method because only the test case will ever care | |
145 # about this, so we have to carefully clean up after ourselves. | |
146 c._tick() | |
147 c.stop() | |
148 self.failUnless(_TPF.stopped) | |
149 self.assertEquals(output, range(10)) | |
150 | |
151 | |
152 def testCallbackReCoiterate(self): | |
153 """ | |
154 If a callback to a deferred returned by coiterate calls coiterate on | |
155 the same Cooperator, we should make sure to only do the minimal amount | |
156 of scheduling work. (This test was added to demonstrate a specific bug | |
157 that was found while writing the scheduler.) | |
158 """ | |
159 calls = [] | |
160 | |
161 class FakeCall: | |
162 def __init__(self, func): | |
163 self.func = func | |
164 | |
165 def __repr__(self): | |
166 return '<FakeCall %r>' % (self.func,) | |
167 | |
168 def sched(f): | |
169 self.failIf(calls, repr(calls)) | |
170 calls.append(FakeCall(f)) | |
171 return calls[-1] | |
172 | |
173 c = task.Cooperator(scheduler=sched, terminationPredicateFactory=lambda:
lambda: True) | |
174 d = c.coiterate(iter(())) | |
175 | |
176 done = [] | |
177 def anotherTask(ign): | |
178 c.coiterate(iter(())).addBoth(done.append) | |
179 | |
180 d.addCallback(anotherTask) | |
181 | |
182 work = 0 | |
183 while not done: | |
184 work += 1 | |
185 while calls: | |
186 calls.pop(0).func() | |
187 work += 1 | |
188 if work > 50: | |
189 self.fail("Cooperator took too long") | |
OLD | NEW |