Index: third_party/twisted_8_1/twisted/flow/test/test_flow.py |
diff --git a/third_party/twisted_8_1/twisted/flow/test/test_flow.py b/third_party/twisted_8_1/twisted/flow/test/test_flow.py |
deleted file mode 100644 |
index e364a93416b01f1ca68872e5b38d9785250c6da9..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/flow/test/test_flow.py |
+++ /dev/null |
@@ -1,491 +0,0 @@ |
-# Copyright (c) 2001-2004 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-# |
-# Author: Clark C. Evans |
-# |
- |
-from __future__ import nested_scopes |
-from __future__ import generators |
- |
-from twisted.flow import flow |
-from twisted.flow.threads import Threaded, QueryIterator |
-from twisted.trial import unittest |
-from twisted.python import failure |
-from twisted.internet import defer, reactor, protocol, interfaces |
-from time import sleep |
- |
-class slowlist: |
- """ this is a generator based list |
- |
- def slowlist(list): |
- list = list[:] |
- while list: |
- yield list.pop(0) |
- |
- It is primarly used to simulate generators by using |
- a list (for testing purposes) without being wrapped |
- as a flow.List, which has all kinds of shortcuts we |
- don't want for testing. |
- """ |
- def __init__(self, list): |
- self.list = list[:] |
- def __iter__(self): |
- return self |
- def next(self): |
- if self.list: |
- return self.list.pop(0) |
- raise StopIteration |
- |
-_onetwothree = ['one','two',flow.Cooperate(),'three'] |
- |
-class producer: |
- """ iterator version of the following generator... |
- |
- def producer(): |
- lst = flow.wrap(slowlist([1,2,3])) |
- nam = flow.wrap(slowlist(_onetwothree)) |
- while True: |
- yield lst |
- yield nam |
- yield (lst.next(),nam.next()) |
- |
- """ |
- def __iter__(self): |
- self.lst = flow.wrap(slowlist([1,2,3])) |
- self.nam = flow.wrap(slowlist(_onetwothree)) |
- self.next = self.yield_lst |
- return self |
- def yield_lst(self): |
- self.next = self.yield_nam |
- return self.lst |
- def yield_nam(self): |
- self.next = self.yield_results |
- return self.nam |
- def yield_results(self): |
- self.next = self.yield_lst |
- return (self.lst.next(), self.nam.next()) |
- |
-class consumer: |
- """ iterator version of the following generator... |
- |
- def consumer(): |
- title = flow.wrap(['Title']) |
- prod = flow.wrap(producer()) |
- yield title |
- yield title.next() |
- yield prod |
- for data in prod: |
- yield data |
- yield prod |
- """ |
- def __iter__(self): |
- self.title = flow.wrap(['Title']) |
- self.lst = flow.wrap(producer()) |
- self.next = self.yield_title |
- return self |
- def yield_title(self): |
- self.next = self.yield_title_result |
- return self.title |
- def yield_title_result(self): |
- self.next = self.yield_lst |
- return self.title.next() |
- def yield_lst(self): |
- self.next = self.yield_result |
- return self.lst |
- def yield_result(self): |
- self.next = self.yield_lst |
- return self.lst.next() |
- |
- |
-class badgen: |
- """ a bad generator... |
- |
- def badgen(): |
- yield 'x' |
- err = 3/ 0 |
- """ |
- def __iter__(self): |
- self.next = self.yield_x |
- return self |
- def yield_x(self): |
- self.next = self.yield_done |
- return 'x' |
- def yield_done(self): |
- err = 3 / 0 |
- raise StopIteration |
- |
-class buildlist: |
- """ building a list |
- |
- def buildlist(src): |
- out = [] |
- yield src |
- for itm in src: |
- out.append(itm) |
- yield src |
- yield out |
- """ |
- def __init__(self, src): |
- self.src = src |
- def __iter__(self): |
- self.out = [] |
- self.next = self.yield_src |
- return self |
- def yield_src(self): |
- self.next = self.yield_append |
- return self.src |
- def yield_append(self): |
- try: |
- self.out.append(self.src.next()) |
- except StopIteration: |
- self.next = self.yield_finish |
- return self.out |
- return self.src |
- def yield_finish(self): |
- raise StopIteration |
- |
-class testconcur: |
- """ interweving two concurrent stages |
- |
- def testconcur(*stages): |
- both = flow.Concurrent(*stages) |
- yield both |
- for stage in both: |
- yield (stage.name, stage.result) |
- yield both |
- """ |
- def __init__(self, *stages): |
- self.both = flow.Concurrent(*stages) |
- def __iter__(self): |
- self.next = self.yield_both |
- return self |
- def yield_both(self): |
- self.next = self.yield_result |
- return self.both |
- def yield_result(self): |
- self.next = self.yield_both |
- stage = self.both.next() |
- return (stage.name, stage.next()) |
- |
-class echoServer: |
- """ a simple echo protocol, server side |
- |
- def echoServer(conn): |
- yield conn |
- for data in conn: |
- yield data |
- yield conn |
- """ |
- def __init__(self, conn): |
- self.conn = conn |
- def __iter__(self): |
- self.next = self.yield_conn |
- return self |
- def yield_conn(self): |
- self.next = self.yield_data |
- return self.conn |
- def yield_data(self): |
- self.next = self.yield_conn |
- return self.conn.next() |
- |
-class echoClient: |
- """ a simple echo client tester |
- |
- def echoClient(conn): |
- yield "testing" |
- yield conn |
- # signal that we are done |
- conn.d.callback(conn.next()) |
- """ |
- def __init__(self, conn): |
- self.conn = conn |
- def __iter__(self): |
- self.next = self.yield_testing |
- return self |
- def yield_testing(self): |
- self.next = self.yield_conn |
- return "testing" |
- def yield_conn(self): |
- self.next = self.yield_stop |
- return self.conn |
- def yield_stop(self): |
- # signal that we are done |
- self.conn.factory.d.callback(self.conn.next()) |
- raise StopIteration() |
- |
-class CountIterator: |
- def __init__(self, count): |
- self.count = count |
- def __iter__(self): |
- return self |
- def next(self): # this is run in a separate thread |
- sleep(.1) |
- val = self.count |
- if not(val): |
- raise StopIteration |
- self.count -= 1 |
- return val |
- |
-class FlowTest(unittest.TestCase): |
- def testNotReady(self): |
- x = flow.wrap([1,2,3]) |
- self.assertRaises(flow.NotReadyError,x.next) |
- |
- def testBasic(self): |
- lhs = ['string'] |
- rhs = list(flow.Block('string')) |
- self.assertEqual(lhs,rhs) |
- |
- def testBasicList(self): |
- lhs = [1,2,3] |
- rhs = list(flow.Block([1,2,flow.Cooperate(),3])) |
- self.assertEqual(lhs,rhs) |
- |
- def testBasicIterator(self): |
- lhs = ['one','two','three'] |
- rhs = list(flow.Block(slowlist(_onetwothree))) |
- self.assertEqual(lhs,rhs) |
- |
- def testCallable(self): |
- lhs = ['one','two','three'] |
- rhs = list(flow.Block(slowlist(_onetwothree))) |
- self.assertEqual(lhs,rhs) |
- |
- def testProducer(self): |
- lhs = [(1,'one'),(2,'two'),(3,'three')] |
- rhs = list(flow.Block(producer())) |
- self.assertEqual(lhs,rhs) |
- |
- def testConsumer(self): |
- lhs = ['Title',(1,'one'),(2,'two'),(3,'three')] |
- rhs = list(flow.Block(consumer)) |
- self.assertEqual(lhs,rhs) |
- |
- def testFailure(self): |
- self.assertRaises(ZeroDivisionError, list, flow.Block(badgen())) |
- self.assertEqual(['x',ZeroDivisionError], |
- list(flow.Block(badgen(),ZeroDivisionError))) |
- self.assertEqual(['x',ZeroDivisionError], |
- list(flow.Block(flow.wrap(badgen()), |
- ZeroDivisionError))) |
- |
- def testZip(self): |
- lhs = [(1,'a'),(2,'b'),(3,None)] |
- mrg = flow.Zip([1,2,flow.Cooperate(),3],['a','b']) |
- rhs = list(flow.Block(mrg)) |
- self.assertEqual(lhs,rhs) |
- |
- def testMerge(self): |
- lhs = ['one', 1, 'two', 2, 3, 'three'] |
- mrg = flow.Merge(slowlist(_onetwothree),slowlist([1,2,3])) |
- rhs = list(flow.Block(mrg)) |
- self.assertEqual(lhs,rhs) |
- |
- def testFilter(self): |
- def odd(val): |
- if val % 2: |
- return True |
- lhs = [ 1, 3 ] |
- mrg = flow.Filter(odd,slowlist([1,2,flow.Cooperate(),3])) |
- rhs = list(flow.Block(mrg)) |
- self.assertEqual(lhs,rhs) |
- |
- def testLineBreak(self): |
- lhs = [ "Hello World", "Happy Days Are Here" ] |
- rhs = ["Hello ","World\nHappy", flow.Cooperate(), |
- " Days"," Are Here\n"] |
- mrg = flow.LineBreak(slowlist(rhs), delimiter='\n') |
- rhs = list(flow.Block(mrg)) |
- self.assertEqual(lhs,rhs) |
- |
- def testDeferred(self): |
- lhs = ['Title', (1,'one'),(2,'two'),(3,'three')] |
- d = flow.Deferred(consumer()) |
- d.addCallback(self.assertEquals, lhs) |
- return d |
- |
- def testBuildList(self): |
- src = flow.wrap([1,2,3]) |
- out = flow.Block(buildlist(src)).next() |
- self.assertEquals(out,[1,2,3]) |
- |
- def testDeferredFailure(self): |
- d = flow.Deferred(badgen()) |
- return self.assertFailure(d, ZeroDivisionError) |
- |
- def testDeferredTrap(self): |
- d = flow.Deferred(badgen(), ZeroDivisionError) |
- d.addCallback(self.assertEqual, ['x', ZeroDivisionError]) |
- return d |
- |
- def testZipFailure(self): |
- lhs = [(1,'a'),(2,'b'),(3,'c')] |
- mrg = flow.Zip([1,2,flow.Cooperate(),3],badgen()) |
- d = flow.Deferred(mrg) |
- return self.assertFailure(d, ZeroDivisionError) |
- |
- def testDeferredWrapper(self): |
- a = defer.Deferred() |
- reactor.callLater(0, lambda: a.callback("test")) |
- b = flow.Merge(a, slowlist([1,2,flow.Cooperate(),3])) |
- d = flow.Deferred(b) |
- d.addCallback(self.assertEqual, [1, 2, 'test', 3]) |
- return d |
- |
- def testDeferredWrapperImmediate(self): |
- from twisted.internet import defer |
- a = defer.Deferred() |
- a.callback("test") |
- self.assertEquals(["test"], list(flow.Block(a))) |
- |
- def testDeferredWrapperFail(self): |
- d = defer.Deferred() |
- f = lambda: d.errback(flow.Failure(IOError())) |
- reactor.callLater(0, f) |
- return self.assertFailure(d, IOError) |
- |
- def testCallback(self): |
- cb = flow.Callback() |
- d = flow.Deferred(buildlist(cb)) |
- for x in range(9): |
- cb.result(x) |
- cb.finish() |
- d.addCallback(self.assertEqual, [range(9)]) |
- return d |
- |
- def testCallbackFailure(self): |
- cb = flow.Callback() |
- d = flow.Deferred(buildlist(cb)) |
- for x in range(3): |
- cb.result(x) |
- cb.errback(flow.Failure(IOError())) |
- return self.assertFailure(d, IOError) |
- |
- def testConcurrentCallback(self): |
- ca = flow.Callback() |
- ca.name = 'a' |
- cb = flow.Callback() |
- cb.name = 'b' |
- d = flow.Deferred(testconcur(ca,cb)) |
- ca.result(1) |
- cb.result(2) |
- ca.result(3) |
- ca.result(4) |
- ca.finish() |
- cb.result(5) |
- cb.finish() |
- d.addCallback(self.assertEquals, |
- [('a',1),('b',2),('a',3),('a',4),('b',5)]) |
- return d |
- |
- def testProtocolLocalhost(self): |
- # this fails if parallel tests are run on the same box |
- server = protocol.ServerFactory() |
- server.protocol = flow.Protocol |
- server.protocol.controller = echoServer |
- port = reactor.listenTCP(0, server) |
- client = protocol.ClientFactory() |
- client.protocol = flow.makeProtocol(echoClient) |
- client.d = defer.Deferred() |
- reactor.connectTCP("127.0.0.1", port.getHost().port, client) |
- client.d.addCallback(self.assertEquals, 'testing') |
- client.d.addBoth(lambda x : |
- client.protocol.transport.loseConnection()) |
- client.d.addBoth(lambda x : |
- defer.maybeDeferred(port.stopListening)) |
- return client.d |
- #testProtocolLocalhost.skip = "XXX freezes, fixme" |
- |
- def testProtocol(self): |
- from twisted.protocols import loopback |
- server = flow.Protocol() |
- server.controller = echoServer |
- client = flow.makeProtocol(echoClient)() |
- client.factory = protocol.ClientFactory() |
- client.factory.d = defer.Deferred() |
- d2 = loopback.loopbackAsync(server, client) |
- client.factory.d.addCallback(self.assertEquals, 'testing') |
- return defer.gatherResults([client.factory.d, d2]) |
- |
- |
-class ThreadedFlowTest(unittest.TestCase): |
- if interfaces.IReactorThreads(reactor, None) is None: |
- skip = ("No thread support in reactor, " |
- "cannot test threaded flow constructs.") |
- |
- |
- def testThreaded(self): |
- expect = [5,4,3,2,1] |
- d = flow.Deferred(Threaded(CountIterator(5))) |
- d.addCallback(self.assertEquals, expect) |
- return d |
- |
- def testThreadedError(self): |
- # is this the expected behaviour? |
- def iterator(): |
- yield 1 |
- raise ValueError |
- d = flow.Deferred(Threaded(iterator())) |
- return self.assertFailure(d, ValueError) |
- |
- def testThreadedSleep(self): |
- expect = [5,4,3,2,1] |
- d = flow.Deferred(Threaded(CountIterator(5))) |
- sleep(.5) |
- d.addCallback(self.assertEquals, expect) |
- return d |
- |
- def testQueryIterator(self): |
- try: |
- from pyPgSQL import PgSQL |
- dbpool = PgSQL |
- c = dbpool.connect() |
- r = c.cursor() |
- r.execute("SELECT 'x'") |
- r.fetchone() |
- except: |
- # PostgreSQL is not installed or bad permissions |
- return |
- expect = [['one'],['two'],['three']] |
- sql = """ |
- (SELECT 'one') |
- UNION ALL |
- (SELECT 'two') |
- UNION ALL |
- (SELECT 'three') |
- """ |
- d = flow.Deferred(Threaded(QueryIterator(dbpool, sql))) |
- d.addCallback(self.assertEquals, expect) |
- return d |
- |
- def testThreadedImmediate(self): |
- """ |
- The goal of this test is to test the callback mechanism with |
- regard to threads, namely to assure that results can be |
- accumulated before they are needed; and that left-over results |
- are immediately made available on the next round (even though |
- the producing thread has shut down). This is a very tough thing |
- to test due to the timing issues. So it may fail on some |
- platforms, I'm not sure. |
- """ |
- expect = [5,4,3,2,1] |
- result = [] |
- f = Threaded(CountIterator(5)) |
- d = defer.Deferred() |
- def process(): |
- coop = f._yield() |
- if f.results: |
- result.extend(f.results) |
- del f.results[:len(result)] |
- reactor.callLater(0, process) |
- return |
- if coop: |
- sleep(.3) |
- reactor.callLater(0, coop.callLater, process) |
- return |
- if f.stop: |
- reactor.callLater(0, d.callback, result) |
- reactor.callLater(0, process) |
- d.addCallback(self.assertEquals, expect) |
- return d |