OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 # | |
5 # Author: Clark C. Evans | |
6 # | |
7 | |
8 from __future__ import nested_scopes | |
9 from __future__ import generators | |
10 | |
11 from twisted.flow import flow | |
12 from twisted.flow.threads import Threaded, QueryIterator | |
13 from twisted.trial import unittest | |
14 from twisted.python import failure | |
15 from twisted.internet import defer, reactor, protocol, interfaces | |
16 from time import sleep | |
17 | |
18 class slowlist: | |
19 """ this is a generator based list | |
20 | |
21 def slowlist(list): | |
22 list = list[:] | |
23 while list: | |
24 yield list.pop(0) | |
25 | |
26 It is primarly used to simulate generators by using | |
27 a list (for testing purposes) without being wrapped | |
28 as a flow.List, which has all kinds of shortcuts we | |
29 don't want for testing. | |
30 """ | |
31 def __init__(self, list): | |
32 self.list = list[:] | |
33 def __iter__(self): | |
34 return self | |
35 def next(self): | |
36 if self.list: | |
37 return self.list.pop(0) | |
38 raise StopIteration | |
39 | |
40 _onetwothree = ['one','two',flow.Cooperate(),'three'] | |
41 | |
42 class producer: | |
43 """ iterator version of the following generator... | |
44 | |
45 def producer(): | |
46 lst = flow.wrap(slowlist([1,2,3])) | |
47 nam = flow.wrap(slowlist(_onetwothree)) | |
48 while True: | |
49 yield lst | |
50 yield nam | |
51 yield (lst.next(),nam.next()) | |
52 | |
53 """ | |
54 def __iter__(self): | |
55 self.lst = flow.wrap(slowlist([1,2,3])) | |
56 self.nam = flow.wrap(slowlist(_onetwothree)) | |
57 self.next = self.yield_lst | |
58 return self | |
59 def yield_lst(self): | |
60 self.next = self.yield_nam | |
61 return self.lst | |
62 def yield_nam(self): | |
63 self.next = self.yield_results | |
64 return self.nam | |
65 def yield_results(self): | |
66 self.next = self.yield_lst | |
67 return (self.lst.next(), self.nam.next()) | |
68 | |
69 class consumer: | |
70 """ iterator version of the following generator... | |
71 | |
72 def consumer(): | |
73 title = flow.wrap(['Title']) | |
74 prod = flow.wrap(producer()) | |
75 yield title | |
76 yield title.next() | |
77 yield prod | |
78 for data in prod: | |
79 yield data | |
80 yield prod | |
81 """ | |
82 def __iter__(self): | |
83 self.title = flow.wrap(['Title']) | |
84 self.lst = flow.wrap(producer()) | |
85 self.next = self.yield_title | |
86 return self | |
87 def yield_title(self): | |
88 self.next = self.yield_title_result | |
89 return self.title | |
90 def yield_title_result(self): | |
91 self.next = self.yield_lst | |
92 return self.title.next() | |
93 def yield_lst(self): | |
94 self.next = self.yield_result | |
95 return self.lst | |
96 def yield_result(self): | |
97 self.next = self.yield_lst | |
98 return self.lst.next() | |
99 | |
100 | |
101 class badgen: | |
102 """ a bad generator... | |
103 | |
104 def badgen(): | |
105 yield 'x' | |
106 err = 3/ 0 | |
107 """ | |
108 def __iter__(self): | |
109 self.next = self.yield_x | |
110 return self | |
111 def yield_x(self): | |
112 self.next = self.yield_done | |
113 return 'x' | |
114 def yield_done(self): | |
115 err = 3 / 0 | |
116 raise StopIteration | |
117 | |
118 class buildlist: | |
119 """ building a list | |
120 | |
121 def buildlist(src): | |
122 out = [] | |
123 yield src | |
124 for itm in src: | |
125 out.append(itm) | |
126 yield src | |
127 yield out | |
128 """ | |
129 def __init__(self, src): | |
130 self.src = src | |
131 def __iter__(self): | |
132 self.out = [] | |
133 self.next = self.yield_src | |
134 return self | |
135 def yield_src(self): | |
136 self.next = self.yield_append | |
137 return self.src | |
138 def yield_append(self): | |
139 try: | |
140 self.out.append(self.src.next()) | |
141 except StopIteration: | |
142 self.next = self.yield_finish | |
143 return self.out | |
144 return self.src | |
145 def yield_finish(self): | |
146 raise StopIteration | |
147 | |
148 class testconcur: | |
149 """ interweving two concurrent stages | |
150 | |
151 def testconcur(*stages): | |
152 both = flow.Concurrent(*stages) | |
153 yield both | |
154 for stage in both: | |
155 yield (stage.name, stage.result) | |
156 yield both | |
157 """ | |
158 def __init__(self, *stages): | |
159 self.both = flow.Concurrent(*stages) | |
160 def __iter__(self): | |
161 self.next = self.yield_both | |
162 return self | |
163 def yield_both(self): | |
164 self.next = self.yield_result | |
165 return self.both | |
166 def yield_result(self): | |
167 self.next = self.yield_both | |
168 stage = self.both.next() | |
169 return (stage.name, stage.next()) | |
170 | |
171 class echoServer: | |
172 """ a simple echo protocol, server side | |
173 | |
174 def echoServer(conn): | |
175 yield conn | |
176 for data in conn: | |
177 yield data | |
178 yield conn | |
179 """ | |
180 def __init__(self, conn): | |
181 self.conn = conn | |
182 def __iter__(self): | |
183 self.next = self.yield_conn | |
184 return self | |
185 def yield_conn(self): | |
186 self.next = self.yield_data | |
187 return self.conn | |
188 def yield_data(self): | |
189 self.next = self.yield_conn | |
190 return self.conn.next() | |
191 | |
192 class echoClient: | |
193 """ a simple echo client tester | |
194 | |
195 def echoClient(conn): | |
196 yield "testing" | |
197 yield conn | |
198 # signal that we are done | |
199 conn.d.callback(conn.next()) | |
200 """ | |
201 def __init__(self, conn): | |
202 self.conn = conn | |
203 def __iter__(self): | |
204 self.next = self.yield_testing | |
205 return self | |
206 def yield_testing(self): | |
207 self.next = self.yield_conn | |
208 return "testing" | |
209 def yield_conn(self): | |
210 self.next = self.yield_stop | |
211 return self.conn | |
212 def yield_stop(self): | |
213 # signal that we are done | |
214 self.conn.factory.d.callback(self.conn.next()) | |
215 raise StopIteration() | |
216 | |
217 class CountIterator: | |
218 def __init__(self, count): | |
219 self.count = count | |
220 def __iter__(self): | |
221 return self | |
222 def next(self): # this is run in a separate thread | |
223 sleep(.1) | |
224 val = self.count | |
225 if not(val): | |
226 raise StopIteration | |
227 self.count -= 1 | |
228 return val | |
229 | |
230 class FlowTest(unittest.TestCase): | |
231 def testNotReady(self): | |
232 x = flow.wrap([1,2,3]) | |
233 self.assertRaises(flow.NotReadyError,x.next) | |
234 | |
235 def testBasic(self): | |
236 lhs = ['string'] | |
237 rhs = list(flow.Block('string')) | |
238 self.assertEqual(lhs,rhs) | |
239 | |
240 def testBasicList(self): | |
241 lhs = [1,2,3] | |
242 rhs = list(flow.Block([1,2,flow.Cooperate(),3])) | |
243 self.assertEqual(lhs,rhs) | |
244 | |
245 def testBasicIterator(self): | |
246 lhs = ['one','two','three'] | |
247 rhs = list(flow.Block(slowlist(_onetwothree))) | |
248 self.assertEqual(lhs,rhs) | |
249 | |
250 def testCallable(self): | |
251 lhs = ['one','two','three'] | |
252 rhs = list(flow.Block(slowlist(_onetwothree))) | |
253 self.assertEqual(lhs,rhs) | |
254 | |
255 def testProducer(self): | |
256 lhs = [(1,'one'),(2,'two'),(3,'three')] | |
257 rhs = list(flow.Block(producer())) | |
258 self.assertEqual(lhs,rhs) | |
259 | |
260 def testConsumer(self): | |
261 lhs = ['Title',(1,'one'),(2,'two'),(3,'three')] | |
262 rhs = list(flow.Block(consumer)) | |
263 self.assertEqual(lhs,rhs) | |
264 | |
265 def testFailure(self): | |
266 self.assertRaises(ZeroDivisionError, list, flow.Block(badgen())) | |
267 self.assertEqual(['x',ZeroDivisionError], | |
268 list(flow.Block(badgen(),ZeroDivisionError))) | |
269 self.assertEqual(['x',ZeroDivisionError], | |
270 list(flow.Block(flow.wrap(badgen()), | |
271 ZeroDivisionError))) | |
272 | |
273 def testZip(self): | |
274 lhs = [(1,'a'),(2,'b'),(3,None)] | |
275 mrg = flow.Zip([1,2,flow.Cooperate(),3],['a','b']) | |
276 rhs = list(flow.Block(mrg)) | |
277 self.assertEqual(lhs,rhs) | |
278 | |
279 def testMerge(self): | |
280 lhs = ['one', 1, 'two', 2, 3, 'three'] | |
281 mrg = flow.Merge(slowlist(_onetwothree),slowlist([1,2,3])) | |
282 rhs = list(flow.Block(mrg)) | |
283 self.assertEqual(lhs,rhs) | |
284 | |
285 def testFilter(self): | |
286 def odd(val): | |
287 if val % 2: | |
288 return True | |
289 lhs = [ 1, 3 ] | |
290 mrg = flow.Filter(odd,slowlist([1,2,flow.Cooperate(),3])) | |
291 rhs = list(flow.Block(mrg)) | |
292 self.assertEqual(lhs,rhs) | |
293 | |
294 def testLineBreak(self): | |
295 lhs = [ "Hello World", "Happy Days Are Here" ] | |
296 rhs = ["Hello ","World\nHappy", flow.Cooperate(), | |
297 " Days"," Are Here\n"] | |
298 mrg = flow.LineBreak(slowlist(rhs), delimiter='\n') | |
299 rhs = list(flow.Block(mrg)) | |
300 self.assertEqual(lhs,rhs) | |
301 | |
302 def testDeferred(self): | |
303 lhs = ['Title', (1,'one'),(2,'two'),(3,'three')] | |
304 d = flow.Deferred(consumer()) | |
305 d.addCallback(self.assertEquals, lhs) | |
306 return d | |
307 | |
308 def testBuildList(self): | |
309 src = flow.wrap([1,2,3]) | |
310 out = flow.Block(buildlist(src)).next() | |
311 self.assertEquals(out,[1,2,3]) | |
312 | |
313 def testDeferredFailure(self): | |
314 d = flow.Deferred(badgen()) | |
315 return self.assertFailure(d, ZeroDivisionError) | |
316 | |
317 def testDeferredTrap(self): | |
318 d = flow.Deferred(badgen(), ZeroDivisionError) | |
319 d.addCallback(self.assertEqual, ['x', ZeroDivisionError]) | |
320 return d | |
321 | |
322 def testZipFailure(self): | |
323 lhs = [(1,'a'),(2,'b'),(3,'c')] | |
324 mrg = flow.Zip([1,2,flow.Cooperate(),3],badgen()) | |
325 d = flow.Deferred(mrg) | |
326 return self.assertFailure(d, ZeroDivisionError) | |
327 | |
328 def testDeferredWrapper(self): | |
329 a = defer.Deferred() | |
330 reactor.callLater(0, lambda: a.callback("test")) | |
331 b = flow.Merge(a, slowlist([1,2,flow.Cooperate(),3])) | |
332 d = flow.Deferred(b) | |
333 d.addCallback(self.assertEqual, [1, 2, 'test', 3]) | |
334 return d | |
335 | |
336 def testDeferredWrapperImmediate(self): | |
337 from twisted.internet import defer | |
338 a = defer.Deferred() | |
339 a.callback("test") | |
340 self.assertEquals(["test"], list(flow.Block(a))) | |
341 | |
342 def testDeferredWrapperFail(self): | |
343 d = defer.Deferred() | |
344 f = lambda: d.errback(flow.Failure(IOError())) | |
345 reactor.callLater(0, f) | |
346 return self.assertFailure(d, IOError) | |
347 | |
348 def testCallback(self): | |
349 cb = flow.Callback() | |
350 d = flow.Deferred(buildlist(cb)) | |
351 for x in range(9): | |
352 cb.result(x) | |
353 cb.finish() | |
354 d.addCallback(self.assertEqual, [range(9)]) | |
355 return d | |
356 | |
357 def testCallbackFailure(self): | |
358 cb = flow.Callback() | |
359 d = flow.Deferred(buildlist(cb)) | |
360 for x in range(3): | |
361 cb.result(x) | |
362 cb.errback(flow.Failure(IOError())) | |
363 return self.assertFailure(d, IOError) | |
364 | |
365 def testConcurrentCallback(self): | |
366 ca = flow.Callback() | |
367 ca.name = 'a' | |
368 cb = flow.Callback() | |
369 cb.name = 'b' | |
370 d = flow.Deferred(testconcur(ca,cb)) | |
371 ca.result(1) | |
372 cb.result(2) | |
373 ca.result(3) | |
374 ca.result(4) | |
375 ca.finish() | |
376 cb.result(5) | |
377 cb.finish() | |
378 d.addCallback(self.assertEquals, | |
379 [('a',1),('b',2),('a',3),('a',4),('b',5)]) | |
380 return d | |
381 | |
382 def testProtocolLocalhost(self): | |
383 # this fails if parallel tests are run on the same box | |
384 server = protocol.ServerFactory() | |
385 server.protocol = flow.Protocol | |
386 server.protocol.controller = echoServer | |
387 port = reactor.listenTCP(0, server) | |
388 client = protocol.ClientFactory() | |
389 client.protocol = flow.makeProtocol(echoClient) | |
390 client.d = defer.Deferred() | |
391 reactor.connectTCP("127.0.0.1", port.getHost().port, client) | |
392 client.d.addCallback(self.assertEquals, 'testing') | |
393 client.d.addBoth(lambda x : | |
394 client.protocol.transport.loseConnection()) | |
395 client.d.addBoth(lambda x : | |
396 defer.maybeDeferred(port.stopListening)) | |
397 return client.d | |
398 #testProtocolLocalhost.skip = "XXX freezes, fixme" | |
399 | |
400 def testProtocol(self): | |
401 from twisted.protocols import loopback | |
402 server = flow.Protocol() | |
403 server.controller = echoServer | |
404 client = flow.makeProtocol(echoClient)() | |
405 client.factory = protocol.ClientFactory() | |
406 client.factory.d = defer.Deferred() | |
407 d2 = loopback.loopbackAsync(server, client) | |
408 client.factory.d.addCallback(self.assertEquals, 'testing') | |
409 return defer.gatherResults([client.factory.d, d2]) | |
410 | |
411 | |
412 class ThreadedFlowTest(unittest.TestCase): | |
413 if interfaces.IReactorThreads(reactor, None) is None: | |
414 skip = ("No thread support in reactor, " | |
415 "cannot test threaded flow constructs.") | |
416 | |
417 | |
418 def testThreaded(self): | |
419 expect = [5,4,3,2,1] | |
420 d = flow.Deferred(Threaded(CountIterator(5))) | |
421 d.addCallback(self.assertEquals, expect) | |
422 return d | |
423 | |
424 def testThreadedError(self): | |
425 # is this the expected behaviour? | |
426 def iterator(): | |
427 yield 1 | |
428 raise ValueError | |
429 d = flow.Deferred(Threaded(iterator())) | |
430 return self.assertFailure(d, ValueError) | |
431 | |
432 def testThreadedSleep(self): | |
433 expect = [5,4,3,2,1] | |
434 d = flow.Deferred(Threaded(CountIterator(5))) | |
435 sleep(.5) | |
436 d.addCallback(self.assertEquals, expect) | |
437 return d | |
438 | |
439 def testQueryIterator(self): | |
440 try: | |
441 from pyPgSQL import PgSQL | |
442 dbpool = PgSQL | |
443 c = dbpool.connect() | |
444 r = c.cursor() | |
445 r.execute("SELECT 'x'") | |
446 r.fetchone() | |
447 except: | |
448 # PostgreSQL is not installed or bad permissions | |
449 return | |
450 expect = [['one'],['two'],['three']] | |
451 sql = """ | |
452 (SELECT 'one') | |
453 UNION ALL | |
454 (SELECT 'two') | |
455 UNION ALL | |
456 (SELECT 'three') | |
457 """ | |
458 d = flow.Deferred(Threaded(QueryIterator(dbpool, sql))) | |
459 d.addCallback(self.assertEquals, expect) | |
460 return d | |
461 | |
462 def testThreadedImmediate(self): | |
463 """ | |
464 The goal of this test is to test the callback mechanism with | |
465 regard to threads, namely to assure that results can be | |
466 accumulated before they are needed; and that left-over results | |
467 are immediately made available on the next round (even though | |
468 the producing thread has shut down). This is a very tough thing | |
469 to test due to the timing issues. So it may fail on some | |
470 platforms, I'm not sure. | |
471 """ | |
472 expect = [5,4,3,2,1] | |
473 result = [] | |
474 f = Threaded(CountIterator(5)) | |
475 d = defer.Deferred() | |
476 def process(): | |
477 coop = f._yield() | |
478 if f.results: | |
479 result.extend(f.results) | |
480 del f.results[:len(result)] | |
481 reactor.callLater(0, process) | |
482 return | |
483 if coop: | |
484 sleep(.3) | |
485 reactor.callLater(0, coop.callLater, process) | |
486 return | |
487 if f.stop: | |
488 reactor.callLater(0, d.callback, result) | |
489 reactor.callLater(0, process) | |
490 d.addCallback(self.assertEquals, expect) | |
491 return d | |
OLD | NEW |