Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(37)

Side by Side Diff: third_party/twisted_8_1/twisted/flow/test/test_flow.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 #
5 # Author: Clark C. Evans
6 #
7
8 from __future__ import nested_scopes
9 from __future__ import generators
10
11 from twisted.flow import flow
12 from twisted.flow.threads import Threaded, QueryIterator
13 from twisted.trial import unittest
14 from twisted.python import failure
15 from twisted.internet import defer, reactor, protocol, interfaces
16 from time import sleep
17
18 class slowlist:
19 """ this is a generator based list
20
21 def slowlist(list):
22 list = list[:]
23 while list:
24 yield list.pop(0)
25
26 It is primarly used to simulate generators by using
27 a list (for testing purposes) without being wrapped
28 as a flow.List, which has all kinds of shortcuts we
29 don't want for testing.
30 """
31 def __init__(self, list):
32 self.list = list[:]
33 def __iter__(self):
34 return self
35 def next(self):
36 if self.list:
37 return self.list.pop(0)
38 raise StopIteration
39
40 _onetwothree = ['one','two',flow.Cooperate(),'three']
41
42 class producer:
43 """ iterator version of the following generator...
44
45 def producer():
46 lst = flow.wrap(slowlist([1,2,3]))
47 nam = flow.wrap(slowlist(_onetwothree))
48 while True:
49 yield lst
50 yield nam
51 yield (lst.next(),nam.next())
52
53 """
54 def __iter__(self):
55 self.lst = flow.wrap(slowlist([1,2,3]))
56 self.nam = flow.wrap(slowlist(_onetwothree))
57 self.next = self.yield_lst
58 return self
59 def yield_lst(self):
60 self.next = self.yield_nam
61 return self.lst
62 def yield_nam(self):
63 self.next = self.yield_results
64 return self.nam
65 def yield_results(self):
66 self.next = self.yield_lst
67 return (self.lst.next(), self.nam.next())
68
69 class consumer:
70 """ iterator version of the following generator...
71
72 def consumer():
73 title = flow.wrap(['Title'])
74 prod = flow.wrap(producer())
75 yield title
76 yield title.next()
77 yield prod
78 for data in prod:
79 yield data
80 yield prod
81 """
82 def __iter__(self):
83 self.title = flow.wrap(['Title'])
84 self.lst = flow.wrap(producer())
85 self.next = self.yield_title
86 return self
87 def yield_title(self):
88 self.next = self.yield_title_result
89 return self.title
90 def yield_title_result(self):
91 self.next = self.yield_lst
92 return self.title.next()
93 def yield_lst(self):
94 self.next = self.yield_result
95 return self.lst
96 def yield_result(self):
97 self.next = self.yield_lst
98 return self.lst.next()
99
100
101 class badgen:
102 """ a bad generator...
103
104 def badgen():
105 yield 'x'
106 err = 3/ 0
107 """
108 def __iter__(self):
109 self.next = self.yield_x
110 return self
111 def yield_x(self):
112 self.next = self.yield_done
113 return 'x'
114 def yield_done(self):
115 err = 3 / 0
116 raise StopIteration
117
118 class buildlist:
119 """ building a list
120
121 def buildlist(src):
122 out = []
123 yield src
124 for itm in src:
125 out.append(itm)
126 yield src
127 yield out
128 """
129 def __init__(self, src):
130 self.src = src
131 def __iter__(self):
132 self.out = []
133 self.next = self.yield_src
134 return self
135 def yield_src(self):
136 self.next = self.yield_append
137 return self.src
138 def yield_append(self):
139 try:
140 self.out.append(self.src.next())
141 except StopIteration:
142 self.next = self.yield_finish
143 return self.out
144 return self.src
145 def yield_finish(self):
146 raise StopIteration
147
148 class testconcur:
149 """ interweving two concurrent stages
150
151 def testconcur(*stages):
152 both = flow.Concurrent(*stages)
153 yield both
154 for stage in both:
155 yield (stage.name, stage.result)
156 yield both
157 """
158 def __init__(self, *stages):
159 self.both = flow.Concurrent(*stages)
160 def __iter__(self):
161 self.next = self.yield_both
162 return self
163 def yield_both(self):
164 self.next = self.yield_result
165 return self.both
166 def yield_result(self):
167 self.next = self.yield_both
168 stage = self.both.next()
169 return (stage.name, stage.next())
170
171 class echoServer:
172 """ a simple echo protocol, server side
173
174 def echoServer(conn):
175 yield conn
176 for data in conn:
177 yield data
178 yield conn
179 """
180 def __init__(self, conn):
181 self.conn = conn
182 def __iter__(self):
183 self.next = self.yield_conn
184 return self
185 def yield_conn(self):
186 self.next = self.yield_data
187 return self.conn
188 def yield_data(self):
189 self.next = self.yield_conn
190 return self.conn.next()
191
192 class echoClient:
193 """ a simple echo client tester
194
195 def echoClient(conn):
196 yield "testing"
197 yield conn
198 # signal that we are done
199 conn.d.callback(conn.next())
200 """
201 def __init__(self, conn):
202 self.conn = conn
203 def __iter__(self):
204 self.next = self.yield_testing
205 return self
206 def yield_testing(self):
207 self.next = self.yield_conn
208 return "testing"
209 def yield_conn(self):
210 self.next = self.yield_stop
211 return self.conn
212 def yield_stop(self):
213 # signal that we are done
214 self.conn.factory.d.callback(self.conn.next())
215 raise StopIteration()
216
217 class CountIterator:
218 def __init__(self, count):
219 self.count = count
220 def __iter__(self):
221 return self
222 def next(self): # this is run in a separate thread
223 sleep(.1)
224 val = self.count
225 if not(val):
226 raise StopIteration
227 self.count -= 1
228 return val
229
230 class FlowTest(unittest.TestCase):
231 def testNotReady(self):
232 x = flow.wrap([1,2,3])
233 self.assertRaises(flow.NotReadyError,x.next)
234
235 def testBasic(self):
236 lhs = ['string']
237 rhs = list(flow.Block('string'))
238 self.assertEqual(lhs,rhs)
239
240 def testBasicList(self):
241 lhs = [1,2,3]
242 rhs = list(flow.Block([1,2,flow.Cooperate(),3]))
243 self.assertEqual(lhs,rhs)
244
245 def testBasicIterator(self):
246 lhs = ['one','two','three']
247 rhs = list(flow.Block(slowlist(_onetwothree)))
248 self.assertEqual(lhs,rhs)
249
250 def testCallable(self):
251 lhs = ['one','two','three']
252 rhs = list(flow.Block(slowlist(_onetwothree)))
253 self.assertEqual(lhs,rhs)
254
255 def testProducer(self):
256 lhs = [(1,'one'),(2,'two'),(3,'three')]
257 rhs = list(flow.Block(producer()))
258 self.assertEqual(lhs,rhs)
259
260 def testConsumer(self):
261 lhs = ['Title',(1,'one'),(2,'two'),(3,'three')]
262 rhs = list(flow.Block(consumer))
263 self.assertEqual(lhs,rhs)
264
265 def testFailure(self):
266 self.assertRaises(ZeroDivisionError, list, flow.Block(badgen()))
267 self.assertEqual(['x',ZeroDivisionError],
268 list(flow.Block(badgen(),ZeroDivisionError)))
269 self.assertEqual(['x',ZeroDivisionError],
270 list(flow.Block(flow.wrap(badgen()),
271 ZeroDivisionError)))
272
273 def testZip(self):
274 lhs = [(1,'a'),(2,'b'),(3,None)]
275 mrg = flow.Zip([1,2,flow.Cooperate(),3],['a','b'])
276 rhs = list(flow.Block(mrg))
277 self.assertEqual(lhs,rhs)
278
279 def testMerge(self):
280 lhs = ['one', 1, 'two', 2, 3, 'three']
281 mrg = flow.Merge(slowlist(_onetwothree),slowlist([1,2,3]))
282 rhs = list(flow.Block(mrg))
283 self.assertEqual(lhs,rhs)
284
285 def testFilter(self):
286 def odd(val):
287 if val % 2:
288 return True
289 lhs = [ 1, 3 ]
290 mrg = flow.Filter(odd,slowlist([1,2,flow.Cooperate(),3]))
291 rhs = list(flow.Block(mrg))
292 self.assertEqual(lhs,rhs)
293
294 def testLineBreak(self):
295 lhs = [ "Hello World", "Happy Days Are Here" ]
296 rhs = ["Hello ","World\nHappy", flow.Cooperate(),
297 " Days"," Are Here\n"]
298 mrg = flow.LineBreak(slowlist(rhs), delimiter='\n')
299 rhs = list(flow.Block(mrg))
300 self.assertEqual(lhs,rhs)
301
302 def testDeferred(self):
303 lhs = ['Title', (1,'one'),(2,'two'),(3,'three')]
304 d = flow.Deferred(consumer())
305 d.addCallback(self.assertEquals, lhs)
306 return d
307
308 def testBuildList(self):
309 src = flow.wrap([1,2,3])
310 out = flow.Block(buildlist(src)).next()
311 self.assertEquals(out,[1,2,3])
312
313 def testDeferredFailure(self):
314 d = flow.Deferred(badgen())
315 return self.assertFailure(d, ZeroDivisionError)
316
317 def testDeferredTrap(self):
318 d = flow.Deferred(badgen(), ZeroDivisionError)
319 d.addCallback(self.assertEqual, ['x', ZeroDivisionError])
320 return d
321
322 def testZipFailure(self):
323 lhs = [(1,'a'),(2,'b'),(3,'c')]
324 mrg = flow.Zip([1,2,flow.Cooperate(),3],badgen())
325 d = flow.Deferred(mrg)
326 return self.assertFailure(d, ZeroDivisionError)
327
328 def testDeferredWrapper(self):
329 a = defer.Deferred()
330 reactor.callLater(0, lambda: a.callback("test"))
331 b = flow.Merge(a, slowlist([1,2,flow.Cooperate(),3]))
332 d = flow.Deferred(b)
333 d.addCallback(self.assertEqual, [1, 2, 'test', 3])
334 return d
335
336 def testDeferredWrapperImmediate(self):
337 from twisted.internet import defer
338 a = defer.Deferred()
339 a.callback("test")
340 self.assertEquals(["test"], list(flow.Block(a)))
341
342 def testDeferredWrapperFail(self):
343 d = defer.Deferred()
344 f = lambda: d.errback(flow.Failure(IOError()))
345 reactor.callLater(0, f)
346 return self.assertFailure(d, IOError)
347
348 def testCallback(self):
349 cb = flow.Callback()
350 d = flow.Deferred(buildlist(cb))
351 for x in range(9):
352 cb.result(x)
353 cb.finish()
354 d.addCallback(self.assertEqual, [range(9)])
355 return d
356
357 def testCallbackFailure(self):
358 cb = flow.Callback()
359 d = flow.Deferred(buildlist(cb))
360 for x in range(3):
361 cb.result(x)
362 cb.errback(flow.Failure(IOError()))
363 return self.assertFailure(d, IOError)
364
365 def testConcurrentCallback(self):
366 ca = flow.Callback()
367 ca.name = 'a'
368 cb = flow.Callback()
369 cb.name = 'b'
370 d = flow.Deferred(testconcur(ca,cb))
371 ca.result(1)
372 cb.result(2)
373 ca.result(3)
374 ca.result(4)
375 ca.finish()
376 cb.result(5)
377 cb.finish()
378 d.addCallback(self.assertEquals,
379 [('a',1),('b',2),('a',3),('a',4),('b',5)])
380 return d
381
382 def testProtocolLocalhost(self):
383 # this fails if parallel tests are run on the same box
384 server = protocol.ServerFactory()
385 server.protocol = flow.Protocol
386 server.protocol.controller = echoServer
387 port = reactor.listenTCP(0, server)
388 client = protocol.ClientFactory()
389 client.protocol = flow.makeProtocol(echoClient)
390 client.d = defer.Deferred()
391 reactor.connectTCP("127.0.0.1", port.getHost().port, client)
392 client.d.addCallback(self.assertEquals, 'testing')
393 client.d.addBoth(lambda x :
394 client.protocol.transport.loseConnection())
395 client.d.addBoth(lambda x :
396 defer.maybeDeferred(port.stopListening))
397 return client.d
398 #testProtocolLocalhost.skip = "XXX freezes, fixme"
399
400 def testProtocol(self):
401 from twisted.protocols import loopback
402 server = flow.Protocol()
403 server.controller = echoServer
404 client = flow.makeProtocol(echoClient)()
405 client.factory = protocol.ClientFactory()
406 client.factory.d = defer.Deferred()
407 d2 = loopback.loopbackAsync(server, client)
408 client.factory.d.addCallback(self.assertEquals, 'testing')
409 return defer.gatherResults([client.factory.d, d2])
410
411
412 class ThreadedFlowTest(unittest.TestCase):
413 if interfaces.IReactorThreads(reactor, None) is None:
414 skip = ("No thread support in reactor, "
415 "cannot test threaded flow constructs.")
416
417
418 def testThreaded(self):
419 expect = [5,4,3,2,1]
420 d = flow.Deferred(Threaded(CountIterator(5)))
421 d.addCallback(self.assertEquals, expect)
422 return d
423
424 def testThreadedError(self):
425 # is this the expected behaviour?
426 def iterator():
427 yield 1
428 raise ValueError
429 d = flow.Deferred(Threaded(iterator()))
430 return self.assertFailure(d, ValueError)
431
432 def testThreadedSleep(self):
433 expect = [5,4,3,2,1]
434 d = flow.Deferred(Threaded(CountIterator(5)))
435 sleep(.5)
436 d.addCallback(self.assertEquals, expect)
437 return d
438
439 def testQueryIterator(self):
440 try:
441 from pyPgSQL import PgSQL
442 dbpool = PgSQL
443 c = dbpool.connect()
444 r = c.cursor()
445 r.execute("SELECT 'x'")
446 r.fetchone()
447 except:
448 # PostgreSQL is not installed or bad permissions
449 return
450 expect = [['one'],['two'],['three']]
451 sql = """
452 (SELECT 'one')
453 UNION ALL
454 (SELECT 'two')
455 UNION ALL
456 (SELECT 'three')
457 """
458 d = flow.Deferred(Threaded(QueryIterator(dbpool, sql)))
459 d.addCallback(self.assertEquals, expect)
460 return d
461
462 def testThreadedImmediate(self):
463 """
464 The goal of this test is to test the callback mechanism with
465 regard to threads, namely to assure that results can be
466 accumulated before they are needed; and that left-over results
467 are immediately made available on the next round (even though
468 the producing thread has shut down). This is a very tough thing
469 to test due to the timing issues. So it may fail on some
470 platforms, I'm not sure.
471 """
472 expect = [5,4,3,2,1]
473 result = []
474 f = Threaded(CountIterator(5))
475 d = defer.Deferred()
476 def process():
477 coop = f._yield()
478 if f.results:
479 result.extend(f.results)
480 del f.results[:len(result)]
481 reactor.callLater(0, process)
482 return
483 if coop:
484 sleep(.3)
485 reactor.callLater(0, coop.callLater, process)
486 return
487 if f.stop:
488 reactor.callLater(0, d.callback, result)
489 reactor.callLater(0, process)
490 d.addCallback(self.assertEquals, expect)
491 return d
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/flow/test/__init__.py ('k') | third_party/twisted_8_1/twisted/flow/threads.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698