| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 # | |
| 5 # Author: Clark C. Evans | |
| 6 # | |
| 7 | |
| 8 from __future__ import nested_scopes | |
| 9 from __future__ import generators | |
| 10 | |
| 11 from twisted.flow import flow | |
| 12 from twisted.flow.threads import Threaded, QueryIterator | |
| 13 from twisted.trial import unittest | |
| 14 from twisted.python import failure | |
| 15 from twisted.internet import defer, reactor, protocol, interfaces | |
| 16 from time import sleep | |
| 17 | |
| 18 class slowlist: | |
| 19 """ this is a generator based list | |
| 20 | |
| 21 def slowlist(list): | |
| 22 list = list[:] | |
| 23 while list: | |
| 24 yield list.pop(0) | |
| 25 | |
| 26 It is primarly used to simulate generators by using | |
| 27 a list (for testing purposes) without being wrapped | |
| 28 as a flow.List, which has all kinds of shortcuts we | |
| 29 don't want for testing. | |
| 30 """ | |
| 31 def __init__(self, list): | |
| 32 self.list = list[:] | |
| 33 def __iter__(self): | |
| 34 return self | |
| 35 def next(self): | |
| 36 if self.list: | |
| 37 return self.list.pop(0) | |
| 38 raise StopIteration | |
| 39 | |
| 40 _onetwothree = ['one','two',flow.Cooperate(),'three'] | |
| 41 | |
| 42 class producer: | |
| 43 """ iterator version of the following generator... | |
| 44 | |
| 45 def producer(): | |
| 46 lst = flow.wrap(slowlist([1,2,3])) | |
| 47 nam = flow.wrap(slowlist(_onetwothree)) | |
| 48 while True: | |
| 49 yield lst | |
| 50 yield nam | |
| 51 yield (lst.next(),nam.next()) | |
| 52 | |
| 53 """ | |
| 54 def __iter__(self): | |
| 55 self.lst = flow.wrap(slowlist([1,2,3])) | |
| 56 self.nam = flow.wrap(slowlist(_onetwothree)) | |
| 57 self.next = self.yield_lst | |
| 58 return self | |
| 59 def yield_lst(self): | |
| 60 self.next = self.yield_nam | |
| 61 return self.lst | |
| 62 def yield_nam(self): | |
| 63 self.next = self.yield_results | |
| 64 return self.nam | |
| 65 def yield_results(self): | |
| 66 self.next = self.yield_lst | |
| 67 return (self.lst.next(), self.nam.next()) | |
| 68 | |
| 69 class consumer: | |
| 70 """ iterator version of the following generator... | |
| 71 | |
| 72 def consumer(): | |
| 73 title = flow.wrap(['Title']) | |
| 74 prod = flow.wrap(producer()) | |
| 75 yield title | |
| 76 yield title.next() | |
| 77 yield prod | |
| 78 for data in prod: | |
| 79 yield data | |
| 80 yield prod | |
| 81 """ | |
| 82 def __iter__(self): | |
| 83 self.title = flow.wrap(['Title']) | |
| 84 self.lst = flow.wrap(producer()) | |
| 85 self.next = self.yield_title | |
| 86 return self | |
| 87 def yield_title(self): | |
| 88 self.next = self.yield_title_result | |
| 89 return self.title | |
| 90 def yield_title_result(self): | |
| 91 self.next = self.yield_lst | |
| 92 return self.title.next() | |
| 93 def yield_lst(self): | |
| 94 self.next = self.yield_result | |
| 95 return self.lst | |
| 96 def yield_result(self): | |
| 97 self.next = self.yield_lst | |
| 98 return self.lst.next() | |
| 99 | |
| 100 | |
| 101 class badgen: | |
| 102 """ a bad generator... | |
| 103 | |
| 104 def badgen(): | |
| 105 yield 'x' | |
| 106 err = 3/ 0 | |
| 107 """ | |
| 108 def __iter__(self): | |
| 109 self.next = self.yield_x | |
| 110 return self | |
| 111 def yield_x(self): | |
| 112 self.next = self.yield_done | |
| 113 return 'x' | |
| 114 def yield_done(self): | |
| 115 err = 3 / 0 | |
| 116 raise StopIteration | |
| 117 | |
| 118 class buildlist: | |
| 119 """ building a list | |
| 120 | |
| 121 def buildlist(src): | |
| 122 out = [] | |
| 123 yield src | |
| 124 for itm in src: | |
| 125 out.append(itm) | |
| 126 yield src | |
| 127 yield out | |
| 128 """ | |
| 129 def __init__(self, src): | |
| 130 self.src = src | |
| 131 def __iter__(self): | |
| 132 self.out = [] | |
| 133 self.next = self.yield_src | |
| 134 return self | |
| 135 def yield_src(self): | |
| 136 self.next = self.yield_append | |
| 137 return self.src | |
| 138 def yield_append(self): | |
| 139 try: | |
| 140 self.out.append(self.src.next()) | |
| 141 except StopIteration: | |
| 142 self.next = self.yield_finish | |
| 143 return self.out | |
| 144 return self.src | |
| 145 def yield_finish(self): | |
| 146 raise StopIteration | |
| 147 | |
| 148 class testconcur: | |
| 149 """ interweving two concurrent stages | |
| 150 | |
| 151 def testconcur(*stages): | |
| 152 both = flow.Concurrent(*stages) | |
| 153 yield both | |
| 154 for stage in both: | |
| 155 yield (stage.name, stage.result) | |
| 156 yield both | |
| 157 """ | |
| 158 def __init__(self, *stages): | |
| 159 self.both = flow.Concurrent(*stages) | |
| 160 def __iter__(self): | |
| 161 self.next = self.yield_both | |
| 162 return self | |
| 163 def yield_both(self): | |
| 164 self.next = self.yield_result | |
| 165 return self.both | |
| 166 def yield_result(self): | |
| 167 self.next = self.yield_both | |
| 168 stage = self.both.next() | |
| 169 return (stage.name, stage.next()) | |
| 170 | |
| 171 class echoServer: | |
| 172 """ a simple echo protocol, server side | |
| 173 | |
| 174 def echoServer(conn): | |
| 175 yield conn | |
| 176 for data in conn: | |
| 177 yield data | |
| 178 yield conn | |
| 179 """ | |
| 180 def __init__(self, conn): | |
| 181 self.conn = conn | |
| 182 def __iter__(self): | |
| 183 self.next = self.yield_conn | |
| 184 return self | |
| 185 def yield_conn(self): | |
| 186 self.next = self.yield_data | |
| 187 return self.conn | |
| 188 def yield_data(self): | |
| 189 self.next = self.yield_conn | |
| 190 return self.conn.next() | |
| 191 | |
| 192 class echoClient: | |
| 193 """ a simple echo client tester | |
| 194 | |
| 195 def echoClient(conn): | |
| 196 yield "testing" | |
| 197 yield conn | |
| 198 # signal that we are done | |
| 199 conn.d.callback(conn.next()) | |
| 200 """ | |
| 201 def __init__(self, conn): | |
| 202 self.conn = conn | |
| 203 def __iter__(self): | |
| 204 self.next = self.yield_testing | |
| 205 return self | |
| 206 def yield_testing(self): | |
| 207 self.next = self.yield_conn | |
| 208 return "testing" | |
| 209 def yield_conn(self): | |
| 210 self.next = self.yield_stop | |
| 211 return self.conn | |
| 212 def yield_stop(self): | |
| 213 # signal that we are done | |
| 214 self.conn.factory.d.callback(self.conn.next()) | |
| 215 raise StopIteration() | |
| 216 | |
| 217 class CountIterator: | |
| 218 def __init__(self, count): | |
| 219 self.count = count | |
| 220 def __iter__(self): | |
| 221 return self | |
| 222 def next(self): # this is run in a separate thread | |
| 223 sleep(.1) | |
| 224 val = self.count | |
| 225 if not(val): | |
| 226 raise StopIteration | |
| 227 self.count -= 1 | |
| 228 return val | |
| 229 | |
| 230 class FlowTest(unittest.TestCase): | |
| 231 def testNotReady(self): | |
| 232 x = flow.wrap([1,2,3]) | |
| 233 self.assertRaises(flow.NotReadyError,x.next) | |
| 234 | |
| 235 def testBasic(self): | |
| 236 lhs = ['string'] | |
| 237 rhs = list(flow.Block('string')) | |
| 238 self.assertEqual(lhs,rhs) | |
| 239 | |
| 240 def testBasicList(self): | |
| 241 lhs = [1,2,3] | |
| 242 rhs = list(flow.Block([1,2,flow.Cooperate(),3])) | |
| 243 self.assertEqual(lhs,rhs) | |
| 244 | |
| 245 def testBasicIterator(self): | |
| 246 lhs = ['one','two','three'] | |
| 247 rhs = list(flow.Block(slowlist(_onetwothree))) | |
| 248 self.assertEqual(lhs,rhs) | |
| 249 | |
| 250 def testCallable(self): | |
| 251 lhs = ['one','two','three'] | |
| 252 rhs = list(flow.Block(slowlist(_onetwothree))) | |
| 253 self.assertEqual(lhs,rhs) | |
| 254 | |
| 255 def testProducer(self): | |
| 256 lhs = [(1,'one'),(2,'two'),(3,'three')] | |
| 257 rhs = list(flow.Block(producer())) | |
| 258 self.assertEqual(lhs,rhs) | |
| 259 | |
| 260 def testConsumer(self): | |
| 261 lhs = ['Title',(1,'one'),(2,'two'),(3,'three')] | |
| 262 rhs = list(flow.Block(consumer)) | |
| 263 self.assertEqual(lhs,rhs) | |
| 264 | |
| 265 def testFailure(self): | |
| 266 self.assertRaises(ZeroDivisionError, list, flow.Block(badgen())) | |
| 267 self.assertEqual(['x',ZeroDivisionError], | |
| 268 list(flow.Block(badgen(),ZeroDivisionError))) | |
| 269 self.assertEqual(['x',ZeroDivisionError], | |
| 270 list(flow.Block(flow.wrap(badgen()), | |
| 271 ZeroDivisionError))) | |
| 272 | |
| 273 def testZip(self): | |
| 274 lhs = [(1,'a'),(2,'b'),(3,None)] | |
| 275 mrg = flow.Zip([1,2,flow.Cooperate(),3],['a','b']) | |
| 276 rhs = list(flow.Block(mrg)) | |
| 277 self.assertEqual(lhs,rhs) | |
| 278 | |
| 279 def testMerge(self): | |
| 280 lhs = ['one', 1, 'two', 2, 3, 'three'] | |
| 281 mrg = flow.Merge(slowlist(_onetwothree),slowlist([1,2,3])) | |
| 282 rhs = list(flow.Block(mrg)) | |
| 283 self.assertEqual(lhs,rhs) | |
| 284 | |
| 285 def testFilter(self): | |
| 286 def odd(val): | |
| 287 if val % 2: | |
| 288 return True | |
| 289 lhs = [ 1, 3 ] | |
| 290 mrg = flow.Filter(odd,slowlist([1,2,flow.Cooperate(),3])) | |
| 291 rhs = list(flow.Block(mrg)) | |
| 292 self.assertEqual(lhs,rhs) | |
| 293 | |
| 294 def testLineBreak(self): | |
| 295 lhs = [ "Hello World", "Happy Days Are Here" ] | |
| 296 rhs = ["Hello ","World\nHappy", flow.Cooperate(), | |
| 297 " Days"," Are Here\n"] | |
| 298 mrg = flow.LineBreak(slowlist(rhs), delimiter='\n') | |
| 299 rhs = list(flow.Block(mrg)) | |
| 300 self.assertEqual(lhs,rhs) | |
| 301 | |
| 302 def testDeferred(self): | |
| 303 lhs = ['Title', (1,'one'),(2,'two'),(3,'three')] | |
| 304 d = flow.Deferred(consumer()) | |
| 305 d.addCallback(self.assertEquals, lhs) | |
| 306 return d | |
| 307 | |
| 308 def testBuildList(self): | |
| 309 src = flow.wrap([1,2,3]) | |
| 310 out = flow.Block(buildlist(src)).next() | |
| 311 self.assertEquals(out,[1,2,3]) | |
| 312 | |
| 313 def testDeferredFailure(self): | |
| 314 d = flow.Deferred(badgen()) | |
| 315 return self.assertFailure(d, ZeroDivisionError) | |
| 316 | |
| 317 def testDeferredTrap(self): | |
| 318 d = flow.Deferred(badgen(), ZeroDivisionError) | |
| 319 d.addCallback(self.assertEqual, ['x', ZeroDivisionError]) | |
| 320 return d | |
| 321 | |
| 322 def testZipFailure(self): | |
| 323 lhs = [(1,'a'),(2,'b'),(3,'c')] | |
| 324 mrg = flow.Zip([1,2,flow.Cooperate(),3],badgen()) | |
| 325 d = flow.Deferred(mrg) | |
| 326 return self.assertFailure(d, ZeroDivisionError) | |
| 327 | |
| 328 def testDeferredWrapper(self): | |
| 329 a = defer.Deferred() | |
| 330 reactor.callLater(0, lambda: a.callback("test")) | |
| 331 b = flow.Merge(a, slowlist([1,2,flow.Cooperate(),3])) | |
| 332 d = flow.Deferred(b) | |
| 333 d.addCallback(self.assertEqual, [1, 2, 'test', 3]) | |
| 334 return d | |
| 335 | |
| 336 def testDeferredWrapperImmediate(self): | |
| 337 from twisted.internet import defer | |
| 338 a = defer.Deferred() | |
| 339 a.callback("test") | |
| 340 self.assertEquals(["test"], list(flow.Block(a))) | |
| 341 | |
| 342 def testDeferredWrapperFail(self): | |
| 343 d = defer.Deferred() | |
| 344 f = lambda: d.errback(flow.Failure(IOError())) | |
| 345 reactor.callLater(0, f) | |
| 346 return self.assertFailure(d, IOError) | |
| 347 | |
| 348 def testCallback(self): | |
| 349 cb = flow.Callback() | |
| 350 d = flow.Deferred(buildlist(cb)) | |
| 351 for x in range(9): | |
| 352 cb.result(x) | |
| 353 cb.finish() | |
| 354 d.addCallback(self.assertEqual, [range(9)]) | |
| 355 return d | |
| 356 | |
| 357 def testCallbackFailure(self): | |
| 358 cb = flow.Callback() | |
| 359 d = flow.Deferred(buildlist(cb)) | |
| 360 for x in range(3): | |
| 361 cb.result(x) | |
| 362 cb.errback(flow.Failure(IOError())) | |
| 363 return self.assertFailure(d, IOError) | |
| 364 | |
| 365 def testConcurrentCallback(self): | |
| 366 ca = flow.Callback() | |
| 367 ca.name = 'a' | |
| 368 cb = flow.Callback() | |
| 369 cb.name = 'b' | |
| 370 d = flow.Deferred(testconcur(ca,cb)) | |
| 371 ca.result(1) | |
| 372 cb.result(2) | |
| 373 ca.result(3) | |
| 374 ca.result(4) | |
| 375 ca.finish() | |
| 376 cb.result(5) | |
| 377 cb.finish() | |
| 378 d.addCallback(self.assertEquals, | |
| 379 [('a',1),('b',2),('a',3),('a',4),('b',5)]) | |
| 380 return d | |
| 381 | |
| 382 def testProtocolLocalhost(self): | |
| 383 # this fails if parallel tests are run on the same box | |
| 384 server = protocol.ServerFactory() | |
| 385 server.protocol = flow.Protocol | |
| 386 server.protocol.controller = echoServer | |
| 387 port = reactor.listenTCP(0, server) | |
| 388 client = protocol.ClientFactory() | |
| 389 client.protocol = flow.makeProtocol(echoClient) | |
| 390 client.d = defer.Deferred() | |
| 391 reactor.connectTCP("127.0.0.1", port.getHost().port, client) | |
| 392 client.d.addCallback(self.assertEquals, 'testing') | |
| 393 client.d.addBoth(lambda x : | |
| 394 client.protocol.transport.loseConnection()) | |
| 395 client.d.addBoth(lambda x : | |
| 396 defer.maybeDeferred(port.stopListening)) | |
| 397 return client.d | |
| 398 #testProtocolLocalhost.skip = "XXX freezes, fixme" | |
| 399 | |
| 400 def testProtocol(self): | |
| 401 from twisted.protocols import loopback | |
| 402 server = flow.Protocol() | |
| 403 server.controller = echoServer | |
| 404 client = flow.makeProtocol(echoClient)() | |
| 405 client.factory = protocol.ClientFactory() | |
| 406 client.factory.d = defer.Deferred() | |
| 407 d2 = loopback.loopbackAsync(server, client) | |
| 408 client.factory.d.addCallback(self.assertEquals, 'testing') | |
| 409 return defer.gatherResults([client.factory.d, d2]) | |
| 410 | |
| 411 | |
| 412 class ThreadedFlowTest(unittest.TestCase): | |
| 413 if interfaces.IReactorThreads(reactor, None) is None: | |
| 414 skip = ("No thread support in reactor, " | |
| 415 "cannot test threaded flow constructs.") | |
| 416 | |
| 417 | |
| 418 def testThreaded(self): | |
| 419 expect = [5,4,3,2,1] | |
| 420 d = flow.Deferred(Threaded(CountIterator(5))) | |
| 421 d.addCallback(self.assertEquals, expect) | |
| 422 return d | |
| 423 | |
| 424 def testThreadedError(self): | |
| 425 # is this the expected behaviour? | |
| 426 def iterator(): | |
| 427 yield 1 | |
| 428 raise ValueError | |
| 429 d = flow.Deferred(Threaded(iterator())) | |
| 430 return self.assertFailure(d, ValueError) | |
| 431 | |
| 432 def testThreadedSleep(self): | |
| 433 expect = [5,4,3,2,1] | |
| 434 d = flow.Deferred(Threaded(CountIterator(5))) | |
| 435 sleep(.5) | |
| 436 d.addCallback(self.assertEquals, expect) | |
| 437 return d | |
| 438 | |
| 439 def testQueryIterator(self): | |
| 440 try: | |
| 441 from pyPgSQL import PgSQL | |
| 442 dbpool = PgSQL | |
| 443 c = dbpool.connect() | |
| 444 r = c.cursor() | |
| 445 r.execute("SELECT 'x'") | |
| 446 r.fetchone() | |
| 447 except: | |
| 448 # PostgreSQL is not installed or bad permissions | |
| 449 return | |
| 450 expect = [['one'],['two'],['three']] | |
| 451 sql = """ | |
| 452 (SELECT 'one') | |
| 453 UNION ALL | |
| 454 (SELECT 'two') | |
| 455 UNION ALL | |
| 456 (SELECT 'three') | |
| 457 """ | |
| 458 d = flow.Deferred(Threaded(QueryIterator(dbpool, sql))) | |
| 459 d.addCallback(self.assertEquals, expect) | |
| 460 return d | |
| 461 | |
| 462 def testThreadedImmediate(self): | |
| 463 """ | |
| 464 The goal of this test is to test the callback mechanism with | |
| 465 regard to threads, namely to assure that results can be | |
| 466 accumulated before they are needed; and that left-over results | |
| 467 are immediately made available on the next round (even though | |
| 468 the producing thread has shut down). This is a very tough thing | |
| 469 to test due to the timing issues. So it may fail on some | |
| 470 platforms, I'm not sure. | |
| 471 """ | |
| 472 expect = [5,4,3,2,1] | |
| 473 result = [] | |
| 474 f = Threaded(CountIterator(5)) | |
| 475 d = defer.Deferred() | |
| 476 def process(): | |
| 477 coop = f._yield() | |
| 478 if f.results: | |
| 479 result.extend(f.results) | |
| 480 del f.results[:len(result)] | |
| 481 reactor.callLater(0, process) | |
| 482 return | |
| 483 if coop: | |
| 484 sleep(.3) | |
| 485 reactor.callLater(0, coop.callLater, process) | |
| 486 return | |
| 487 if f.stop: | |
| 488 reactor.callLater(0, d.callback, result) | |
| 489 reactor.callLater(0, process) | |
| 490 d.addCallback(self.assertEquals, expect) | |
| 491 return d | |
| OLD | NEW |