OLD | NEW |
| (Empty) |
1 # -*- Python -*- | |
2 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 | |
6 __version__ = '$Revision: 1.5 $'[11:-2] | |
7 | |
8 from StringIO import StringIO | |
9 from twisted.trial import unittest | |
10 from twisted.protocols import pcp | |
11 | |
12 # Goal: | |
13 | |
14 # Take a Protocol instance. Own all outgoing data - anything that | |
15 # would go to p.transport.write. Own all incoming data - anything | |
16 # that comes to p.dataReceived. | |
17 | |
18 # I need: | |
19 # Something with the AbstractFileDescriptor interface. | |
20 # That is: | |
21 # - acts as a Transport | |
22 # - has a method write() | |
23 # - which buffers | |
24 # - acts as a Consumer | |
25 # - has a registerProducer, unRegisterProducer | |
26 # - tells the Producer to back off (pauseProducing) when its buffer is full. | |
27 # - tells the Producer to resumeProducing when its buffer is not so full. | |
28 # - acts as a Producer | |
29 # - calls registerProducer | |
30 # - calls write() on consumers | |
31 # - honors requests to pause/resume producing | |
32 # - honors stopProducing, and passes it along to upstream Producers | |
33 | |
34 | |
35 class DummyTransport: | |
36 """A dumb transport to wrap around.""" | |
37 | |
38 def __init__(self): | |
39 self._writes = [] | |
40 | |
41 def write(self, data): | |
42 self._writes.append(data) | |
43 | |
44 def getvalue(self): | |
45 return ''.join(self._writes) | |
46 | |
47 class DummyProducer: | |
48 resumed = False | |
49 stopped = False | |
50 paused = False | |
51 | |
52 def __init__(self, consumer): | |
53 self.consumer = consumer | |
54 | |
55 def resumeProducing(self): | |
56 self.resumed = True | |
57 self.paused = False | |
58 | |
59 def pauseProducing(self): | |
60 self.paused = True | |
61 | |
62 def stopProducing(self): | |
63 self.stopped = True | |
64 | |
65 | |
66 class DummyConsumer(DummyTransport): | |
67 producer = None | |
68 finished = False | |
69 unregistered = True | |
70 | |
71 def registerProducer(self, producer, streaming): | |
72 self.producer = (producer, streaming) | |
73 | |
74 def unregisterProducer(self): | |
75 self.unregistered = True | |
76 | |
77 def finish(self): | |
78 self.finished = True | |
79 | |
80 class TransportInterfaceTest(unittest.TestCase): | |
81 proxyClass = pcp.BasicProducerConsumerProxy | |
82 | |
83 def setUp(self): | |
84 self.underlying = DummyConsumer() | |
85 self.transport = self.proxyClass(self.underlying) | |
86 | |
87 def testWrite(self): | |
88 self.transport.write("some bytes") | |
89 | |
90 class ConsumerInterfaceTest: | |
91 """Test ProducerConsumerProxy as a Consumer. | |
92 | |
93 Normally we have ProducingServer -> ConsumingTransport. | |
94 | |
95 If I am to go between (Server -> Shaper -> Transport), I have to | |
96 play the role of Consumer convincingly for the ProducingServer. | |
97 """ | |
98 | |
99 def setUp(self): | |
100 self.underlying = DummyConsumer() | |
101 self.consumer = self.proxyClass(self.underlying) | |
102 self.producer = DummyProducer(self.consumer) | |
103 | |
104 def testRegisterPush(self): | |
105 self.consumer.registerProducer(self.producer, True) | |
106 ## Consumer should NOT have called PushProducer.resumeProducing | |
107 self.failIf(self.producer.resumed) | |
108 | |
109 ## I'm I'm just a proxy, should I only do resumeProducing when | |
110 ## I get poked myself? | |
111 #def testRegisterPull(self): | |
112 # self.consumer.registerProducer(self.producer, False) | |
113 # ## Consumer SHOULD have called PushProducer.resumeProducing | |
114 # self.failUnless(self.producer.resumed) | |
115 | |
116 def testUnregister(self): | |
117 self.consumer.registerProducer(self.producer, False) | |
118 self.consumer.unregisterProducer() | |
119 # Now when the consumer would ordinarily want more data, it | |
120 # shouldn't ask producer for it. | |
121 # The most succinct way to trigger "want more data" is to proxy for | |
122 # a PullProducer and have someone ask me for data. | |
123 self.producer.resumed = False | |
124 self.consumer.resumeProducing() | |
125 self.failIf(self.producer.resumed) | |
126 | |
127 def testFinish(self): | |
128 self.consumer.registerProducer(self.producer, False) | |
129 self.consumer.finish() | |
130 # I guess finish should behave like unregister? | |
131 self.producer.resumed = False | |
132 self.consumer.resumeProducing() | |
133 self.failIf(self.producer.resumed) | |
134 | |
135 | |
136 class ProducerInterfaceTest: | |
137 """Test ProducerConsumerProxy as a Producer. | |
138 | |
139 Normally we have ProducingServer -> ConsumingTransport. | |
140 | |
141 If I am to go between (Server -> Shaper -> Transport), I have to | |
142 play the role of Producer convincingly for the ConsumingTransport. | |
143 """ | |
144 | |
145 def setUp(self): | |
146 self.consumer = DummyConsumer() | |
147 self.producer = self.proxyClass(self.consumer) | |
148 | |
149 def testRegistersProducer(self): | |
150 self.failUnlessEqual(self.consumer.producer[0], self.producer) | |
151 | |
152 def testPause(self): | |
153 self.producer.pauseProducing() | |
154 self.producer.write("yakkity yak") | |
155 self.failIf(self.consumer.getvalue(), | |
156 "Paused producer should not have sent data.") | |
157 | |
158 def testResume(self): | |
159 self.producer.pauseProducing() | |
160 self.producer.resumeProducing() | |
161 self.producer.write("yakkity yak") | |
162 self.failUnlessEqual(self.consumer.getvalue(), "yakkity yak") | |
163 | |
164 def testResumeNoEmptyWrite(self): | |
165 self.producer.pauseProducing() | |
166 self.producer.resumeProducing() | |
167 self.failUnlessEqual(len(self.consumer._writes), 0, | |
168 "Resume triggered an empty write.") | |
169 | |
170 def testResumeBuffer(self): | |
171 self.producer.pauseProducing() | |
172 self.producer.write("buffer this") | |
173 self.producer.resumeProducing() | |
174 self.failUnlessEqual(self.consumer.getvalue(), "buffer this") | |
175 | |
176 def testStop(self): | |
177 self.producer.stopProducing() | |
178 self.producer.write("yakkity yak") | |
179 self.failIf(self.consumer.getvalue(), | |
180 "Stopped producer should not have sent data.") | |
181 | |
182 | |
183 class PCP_ConsumerInterfaceTest(ConsumerInterfaceTest, unittest.TestCase): | |
184 proxyClass = pcp.BasicProducerConsumerProxy | |
185 | |
186 class PCPII_ConsumerInterfaceTest(ConsumerInterfaceTest, unittest.TestCase): | |
187 proxyClass = pcp.ProducerConsumerProxy | |
188 | |
189 class PCP_ProducerInterfaceTest(ProducerInterfaceTest, unittest.TestCase): | |
190 proxyClass = pcp.BasicProducerConsumerProxy | |
191 | |
192 class PCPII_ProducerInterfaceTest(ProducerInterfaceTest, unittest.TestCase): | |
193 proxyClass = pcp.ProducerConsumerProxy | |
194 | |
195 class ProducerProxyTest(unittest.TestCase): | |
196 """Producer methods on me should be relayed to the Producer I proxy. | |
197 """ | |
198 proxyClass = pcp.BasicProducerConsumerProxy | |
199 | |
200 def setUp(self): | |
201 self.proxy = self.proxyClass(None) | |
202 self.parentProducer = DummyProducer(self.proxy) | |
203 self.proxy.registerProducer(self.parentProducer, True) | |
204 | |
205 def testStop(self): | |
206 self.proxy.stopProducing() | |
207 self.failUnless(self.parentProducer.stopped) | |
208 | |
209 | |
210 class ConsumerProxyTest(unittest.TestCase): | |
211 """Consumer methods on me should be relayed to the Consumer I proxy. | |
212 """ | |
213 proxyClass = pcp.BasicProducerConsumerProxy | |
214 | |
215 def setUp(self): | |
216 self.underlying = DummyConsumer() | |
217 self.consumer = self.proxyClass(self.underlying) | |
218 | |
219 def testWrite(self): | |
220 # NOTE: This test only valid for streaming (Push) systems. | |
221 self.consumer.write("some bytes") | |
222 self.failUnlessEqual(self.underlying.getvalue(), "some bytes") | |
223 | |
224 def testFinish(self): | |
225 self.consumer.finish() | |
226 self.failUnless(self.underlying.finished) | |
227 | |
228 def testUnregister(self): | |
229 self.consumer.unregisterProducer() | |
230 self.failUnless(self.underlying.unregistered) | |
231 | |
232 | |
233 class PullProducerTest: | |
234 def setUp(self): | |
235 self.underlying = DummyConsumer() | |
236 self.proxy = self.proxyClass(self.underlying) | |
237 self.parentProducer = DummyProducer(self.proxy) | |
238 self.proxy.registerProducer(self.parentProducer, True) | |
239 | |
240 def testHoldWrites(self): | |
241 self.proxy.write("hello") | |
242 # Consumer should get no data before it says resumeProducing. | |
243 self.failIf(self.underlying.getvalue(), | |
244 "Pulling Consumer got data before it pulled.") | |
245 | |
246 def testPull(self): | |
247 self.proxy.write("hello") | |
248 self.proxy.resumeProducing() | |
249 self.failUnlessEqual(self.underlying.getvalue(), "hello") | |
250 | |
251 def testMergeWrites(self): | |
252 self.proxy.write("hello ") | |
253 self.proxy.write("sunshine") | |
254 self.proxy.resumeProducing() | |
255 nwrites = len(self.underlying._writes) | |
256 self.failUnlessEqual(nwrites, 1, "Pull resulted in %d writes instead " | |
257 "of 1." % (nwrites,)) | |
258 self.failUnlessEqual(self.underlying.getvalue(), "hello sunshine") | |
259 | |
260 | |
261 def testLateWrite(self): | |
262 # consumer sends its initial pull before we have data | |
263 self.proxy.resumeProducing() | |
264 self.proxy.write("data") | |
265 # This data should answer that pull request. | |
266 self.failUnlessEqual(self.underlying.getvalue(), "data") | |
267 | |
268 class PCP_PullProducerTest(PullProducerTest, unittest.TestCase): | |
269 class proxyClass(pcp.BasicProducerConsumerProxy): | |
270 iAmStreaming = False | |
271 | |
272 class PCPII_PullProducerTest(PullProducerTest, unittest.TestCase): | |
273 class proxyClass(pcp.ProducerConsumerProxy): | |
274 iAmStreaming = False | |
275 | |
276 # Buffering! | |
277 | |
278 class BufferedConsumerTest(unittest.TestCase): | |
279 """As a consumer, ask the producer to pause after too much data.""" | |
280 | |
281 proxyClass = pcp.ProducerConsumerProxy | |
282 | |
283 def setUp(self): | |
284 self.underlying = DummyConsumer() | |
285 self.proxy = self.proxyClass(self.underlying) | |
286 self.proxy.bufferSize = 100 | |
287 | |
288 self.parentProducer = DummyProducer(self.proxy) | |
289 self.proxy.registerProducer(self.parentProducer, True) | |
290 | |
291 def testRegisterPull(self): | |
292 self.proxy.registerProducer(self.parentProducer, False) | |
293 ## Consumer SHOULD have called PushProducer.resumeProducing | |
294 self.failUnless(self.parentProducer.resumed) | |
295 | |
296 def testPauseIntercept(self): | |
297 self.proxy.pauseProducing() | |
298 self.failIf(self.parentProducer.paused) | |
299 | |
300 def testResumeIntercept(self): | |
301 self.proxy.pauseProducing() | |
302 self.proxy.resumeProducing() | |
303 # With a streaming producer, just because the proxy was resumed is | |
304 # not necessarily a reason to resume the parent producer. The state | |
305 # of the buffer should decide that. | |
306 self.failIf(self.parentProducer.resumed) | |
307 | |
308 def testTriggerPause(self): | |
309 """Make sure I say \"when.\"""" | |
310 | |
311 # Pause the proxy so data sent to it builds up in its buffer. | |
312 self.proxy.pauseProducing() | |
313 self.failIf(self.parentProducer.paused, "don't pause yet") | |
314 self.proxy.write("x" * 51) | |
315 self.failIf(self.parentProducer.paused, "don't pause yet") | |
316 self.proxy.write("x" * 51) | |
317 self.failUnless(self.parentProducer.paused) | |
318 | |
319 def testTriggerResume(self): | |
320 """Make sure I resumeProducing when my buffer empties.""" | |
321 self.proxy.pauseProducing() | |
322 self.proxy.write("x" * 102) | |
323 self.failUnless(self.parentProducer.paused, "should be paused") | |
324 self.proxy.resumeProducing() | |
325 # Resuming should have emptied my buffer, so I should tell my | |
326 # parent to resume too. | |
327 self.failIf(self.parentProducer.paused, | |
328 "Producer should have resumed.") | |
329 self.failIf(self.proxy.producerPaused) | |
330 | |
331 class BufferedPullTests(unittest.TestCase): | |
332 class proxyClass(pcp.ProducerConsumerProxy): | |
333 iAmStreaming = False | |
334 | |
335 def _writeSomeData(self, data): | |
336 pcp.ProducerConsumerProxy._writeSomeData(self, data[:100]) | |
337 return min(len(data), 100) | |
338 | |
339 def setUp(self): | |
340 self.underlying = DummyConsumer() | |
341 self.proxy = self.proxyClass(self.underlying) | |
342 self.proxy.bufferSize = 100 | |
343 | |
344 self.parentProducer = DummyProducer(self.proxy) | |
345 self.proxy.registerProducer(self.parentProducer, False) | |
346 | |
347 def testResumePull(self): | |
348 # If proxy has no data to send on resumeProducing, it had better pull | |
349 # some from its PullProducer. | |
350 self.parentProducer.resumed = False | |
351 self.proxy.resumeProducing() | |
352 self.failUnless(self.parentProducer.resumed) | |
353 | |
354 def testLateWriteBuffering(self): | |
355 # consumer sends its initial pull before we have data | |
356 self.proxy.resumeProducing() | |
357 self.proxy.write("datum" * 21) | |
358 # This data should answer that pull request. | |
359 self.failUnlessEqual(self.underlying.getvalue(), "datum" * 20) | |
360 # but there should be some left over | |
361 self.failUnlessEqual(self.proxy._buffer, ["datum"]) | |
362 | |
363 | |
364 # TODO: | |
365 # test that web request finishing bug (when we weren't proxying | |
366 # unregisterProducer but were proxying finish, web file transfers | |
367 # would hang on the last block.) | |
368 # test what happens if writeSomeBytes decided to write zero bytes. | |
OLD | NEW |