| Index: third_party/twisted_8_1/twisted/flow/test/test_flow.py
|
| diff --git a/third_party/twisted_8_1/twisted/flow/test/test_flow.py b/third_party/twisted_8_1/twisted/flow/test/test_flow.py
|
| deleted file mode 100644
|
| index e364a93416b01f1ca68872e5b38d9785250c6da9..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/flow/test/test_flow.py
|
| +++ /dev/null
|
| @@ -1,491 +0,0 @@
|
| -# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -#
|
| -# Author: Clark C. Evans
|
| -#
|
| -
|
| -from __future__ import nested_scopes
|
| -from __future__ import generators
|
| -
|
| -from twisted.flow import flow
|
| -from twisted.flow.threads import Threaded, QueryIterator
|
| -from twisted.trial import unittest
|
| -from twisted.python import failure
|
| -from twisted.internet import defer, reactor, protocol, interfaces
|
| -from time import sleep
|
| -
|
| -class slowlist:
|
| - """ this is a generator based list
|
| -
|
| - def slowlist(list):
|
| - list = list[:]
|
| - while list:
|
| - yield list.pop(0)
|
| -
|
| - It is primarly used to simulate generators by using
|
| - a list (for testing purposes) without being wrapped
|
| - as a flow.List, which has all kinds of shortcuts we
|
| - don't want for testing.
|
| - """
|
| - def __init__(self, list):
|
| - self.list = list[:]
|
| - def __iter__(self):
|
| - return self
|
| - def next(self):
|
| - if self.list:
|
| - return self.list.pop(0)
|
| - raise StopIteration
|
| -
|
| -_onetwothree = ['one','two',flow.Cooperate(),'three']
|
| -
|
| -class producer:
|
| - """ iterator version of the following generator...
|
| -
|
| - def producer():
|
| - lst = flow.wrap(slowlist([1,2,3]))
|
| - nam = flow.wrap(slowlist(_onetwothree))
|
| - while True:
|
| - yield lst
|
| - yield nam
|
| - yield (lst.next(),nam.next())
|
| -
|
| - """
|
| - def __iter__(self):
|
| - self.lst = flow.wrap(slowlist([1,2,3]))
|
| - self.nam = flow.wrap(slowlist(_onetwothree))
|
| - self.next = self.yield_lst
|
| - return self
|
| - def yield_lst(self):
|
| - self.next = self.yield_nam
|
| - return self.lst
|
| - def yield_nam(self):
|
| - self.next = self.yield_results
|
| - return self.nam
|
| - def yield_results(self):
|
| - self.next = self.yield_lst
|
| - return (self.lst.next(), self.nam.next())
|
| -
|
| -class consumer:
|
| - """ iterator version of the following generator...
|
| -
|
| - def consumer():
|
| - title = flow.wrap(['Title'])
|
| - prod = flow.wrap(producer())
|
| - yield title
|
| - yield title.next()
|
| - yield prod
|
| - for data in prod:
|
| - yield data
|
| - yield prod
|
| - """
|
| - def __iter__(self):
|
| - self.title = flow.wrap(['Title'])
|
| - self.lst = flow.wrap(producer())
|
| - self.next = self.yield_title
|
| - return self
|
| - def yield_title(self):
|
| - self.next = self.yield_title_result
|
| - return self.title
|
| - def yield_title_result(self):
|
| - self.next = self.yield_lst
|
| - return self.title.next()
|
| - def yield_lst(self):
|
| - self.next = self.yield_result
|
| - return self.lst
|
| - def yield_result(self):
|
| - self.next = self.yield_lst
|
| - return self.lst.next()
|
| -
|
| -
|
| -class badgen:
|
| - """ a bad generator...
|
| -
|
| - def badgen():
|
| - yield 'x'
|
| - err = 3/ 0
|
| - """
|
| - def __iter__(self):
|
| - self.next = self.yield_x
|
| - return self
|
| - def yield_x(self):
|
| - self.next = self.yield_done
|
| - return 'x'
|
| - def yield_done(self):
|
| - err = 3 / 0
|
| - raise StopIteration
|
| -
|
| -class buildlist:
|
| - """ building a list
|
| -
|
| - def buildlist(src):
|
| - out = []
|
| - yield src
|
| - for itm in src:
|
| - out.append(itm)
|
| - yield src
|
| - yield out
|
| - """
|
| - def __init__(self, src):
|
| - self.src = src
|
| - def __iter__(self):
|
| - self.out = []
|
| - self.next = self.yield_src
|
| - return self
|
| - def yield_src(self):
|
| - self.next = self.yield_append
|
| - return self.src
|
| - def yield_append(self):
|
| - try:
|
| - self.out.append(self.src.next())
|
| - except StopIteration:
|
| - self.next = self.yield_finish
|
| - return self.out
|
| - return self.src
|
| - def yield_finish(self):
|
| - raise StopIteration
|
| -
|
| -class testconcur:
|
| - """ interweving two concurrent stages
|
| -
|
| - def testconcur(*stages):
|
| - both = flow.Concurrent(*stages)
|
| - yield both
|
| - for stage in both:
|
| - yield (stage.name, stage.result)
|
| - yield both
|
| - """
|
| - def __init__(self, *stages):
|
| - self.both = flow.Concurrent(*stages)
|
| - def __iter__(self):
|
| - self.next = self.yield_both
|
| - return self
|
| - def yield_both(self):
|
| - self.next = self.yield_result
|
| - return self.both
|
| - def yield_result(self):
|
| - self.next = self.yield_both
|
| - stage = self.both.next()
|
| - return (stage.name, stage.next())
|
| -
|
| -class echoServer:
|
| - """ a simple echo protocol, server side
|
| -
|
| - def echoServer(conn):
|
| - yield conn
|
| - for data in conn:
|
| - yield data
|
| - yield conn
|
| - """
|
| - def __init__(self, conn):
|
| - self.conn = conn
|
| - def __iter__(self):
|
| - self.next = self.yield_conn
|
| - return self
|
| - def yield_conn(self):
|
| - self.next = self.yield_data
|
| - return self.conn
|
| - def yield_data(self):
|
| - self.next = self.yield_conn
|
| - return self.conn.next()
|
| -
|
| -class echoClient:
|
| - """ a simple echo client tester
|
| -
|
| - def echoClient(conn):
|
| - yield "testing"
|
| - yield conn
|
| - # signal that we are done
|
| - conn.d.callback(conn.next())
|
| - """
|
| - def __init__(self, conn):
|
| - self.conn = conn
|
| - def __iter__(self):
|
| - self.next = self.yield_testing
|
| - return self
|
| - def yield_testing(self):
|
| - self.next = self.yield_conn
|
| - return "testing"
|
| - def yield_conn(self):
|
| - self.next = self.yield_stop
|
| - return self.conn
|
| - def yield_stop(self):
|
| - # signal that we are done
|
| - self.conn.factory.d.callback(self.conn.next())
|
| - raise StopIteration()
|
| -
|
| -class CountIterator:
|
| - def __init__(self, count):
|
| - self.count = count
|
| - def __iter__(self):
|
| - return self
|
| - def next(self): # this is run in a separate thread
|
| - sleep(.1)
|
| - val = self.count
|
| - if not(val):
|
| - raise StopIteration
|
| - self.count -= 1
|
| - return val
|
| -
|
| -class FlowTest(unittest.TestCase):
|
| - def testNotReady(self):
|
| - x = flow.wrap([1,2,3])
|
| - self.assertRaises(flow.NotReadyError,x.next)
|
| -
|
| - def testBasic(self):
|
| - lhs = ['string']
|
| - rhs = list(flow.Block('string'))
|
| - self.assertEqual(lhs,rhs)
|
| -
|
| - def testBasicList(self):
|
| - lhs = [1,2,3]
|
| - rhs = list(flow.Block([1,2,flow.Cooperate(),3]))
|
| - self.assertEqual(lhs,rhs)
|
| -
|
| - def testBasicIterator(self):
|
| - lhs = ['one','two','three']
|
| - rhs = list(flow.Block(slowlist(_onetwothree)))
|
| - self.assertEqual(lhs,rhs)
|
| -
|
| - def testCallable(self):
|
| - lhs = ['one','two','three']
|
| - rhs = list(flow.Block(slowlist(_onetwothree)))
|
| - self.assertEqual(lhs,rhs)
|
| -
|
| - def testProducer(self):
|
| - lhs = [(1,'one'),(2,'two'),(3,'three')]
|
| - rhs = list(flow.Block(producer()))
|
| - self.assertEqual(lhs,rhs)
|
| -
|
| - def testConsumer(self):
|
| - lhs = ['Title',(1,'one'),(2,'two'),(3,'three')]
|
| - rhs = list(flow.Block(consumer))
|
| - self.assertEqual(lhs,rhs)
|
| -
|
| - def testFailure(self):
|
| - self.assertRaises(ZeroDivisionError, list, flow.Block(badgen()))
|
| - self.assertEqual(['x',ZeroDivisionError],
|
| - list(flow.Block(badgen(),ZeroDivisionError)))
|
| - self.assertEqual(['x',ZeroDivisionError],
|
| - list(flow.Block(flow.wrap(badgen()),
|
| - ZeroDivisionError)))
|
| -
|
| - def testZip(self):
|
| - lhs = [(1,'a'),(2,'b'),(3,None)]
|
| - mrg = flow.Zip([1,2,flow.Cooperate(),3],['a','b'])
|
| - rhs = list(flow.Block(mrg))
|
| - self.assertEqual(lhs,rhs)
|
| -
|
| - def testMerge(self):
|
| - lhs = ['one', 1, 'two', 2, 3, 'three']
|
| - mrg = flow.Merge(slowlist(_onetwothree),slowlist([1,2,3]))
|
| - rhs = list(flow.Block(mrg))
|
| - self.assertEqual(lhs,rhs)
|
| -
|
| - def testFilter(self):
|
| - def odd(val):
|
| - if val % 2:
|
| - return True
|
| - lhs = [ 1, 3 ]
|
| - mrg = flow.Filter(odd,slowlist([1,2,flow.Cooperate(),3]))
|
| - rhs = list(flow.Block(mrg))
|
| - self.assertEqual(lhs,rhs)
|
| -
|
| - def testLineBreak(self):
|
| - lhs = [ "Hello World", "Happy Days Are Here" ]
|
| - rhs = ["Hello ","World\nHappy", flow.Cooperate(),
|
| - " Days"," Are Here\n"]
|
| - mrg = flow.LineBreak(slowlist(rhs), delimiter='\n')
|
| - rhs = list(flow.Block(mrg))
|
| - self.assertEqual(lhs,rhs)
|
| -
|
| - def testDeferred(self):
|
| - lhs = ['Title', (1,'one'),(2,'two'),(3,'three')]
|
| - d = flow.Deferred(consumer())
|
| - d.addCallback(self.assertEquals, lhs)
|
| - return d
|
| -
|
| - def testBuildList(self):
|
| - src = flow.wrap([1,2,3])
|
| - out = flow.Block(buildlist(src)).next()
|
| - self.assertEquals(out,[1,2,3])
|
| -
|
| - def testDeferredFailure(self):
|
| - d = flow.Deferred(badgen())
|
| - return self.assertFailure(d, ZeroDivisionError)
|
| -
|
| - def testDeferredTrap(self):
|
| - d = flow.Deferred(badgen(), ZeroDivisionError)
|
| - d.addCallback(self.assertEqual, ['x', ZeroDivisionError])
|
| - return d
|
| -
|
| - def testZipFailure(self):
|
| - lhs = [(1,'a'),(2,'b'),(3,'c')]
|
| - mrg = flow.Zip([1,2,flow.Cooperate(),3],badgen())
|
| - d = flow.Deferred(mrg)
|
| - return self.assertFailure(d, ZeroDivisionError)
|
| -
|
| - def testDeferredWrapper(self):
|
| - a = defer.Deferred()
|
| - reactor.callLater(0, lambda: a.callback("test"))
|
| - b = flow.Merge(a, slowlist([1,2,flow.Cooperate(),3]))
|
| - d = flow.Deferred(b)
|
| - d.addCallback(self.assertEqual, [1, 2, 'test', 3])
|
| - return d
|
| -
|
| - def testDeferredWrapperImmediate(self):
|
| - from twisted.internet import defer
|
| - a = defer.Deferred()
|
| - a.callback("test")
|
| - self.assertEquals(["test"], list(flow.Block(a)))
|
| -
|
| - def testDeferredWrapperFail(self):
|
| - d = defer.Deferred()
|
| - f = lambda: d.errback(flow.Failure(IOError()))
|
| - reactor.callLater(0, f)
|
| - return self.assertFailure(d, IOError)
|
| -
|
| - def testCallback(self):
|
| - cb = flow.Callback()
|
| - d = flow.Deferred(buildlist(cb))
|
| - for x in range(9):
|
| - cb.result(x)
|
| - cb.finish()
|
| - d.addCallback(self.assertEqual, [range(9)])
|
| - return d
|
| -
|
| - def testCallbackFailure(self):
|
| - cb = flow.Callback()
|
| - d = flow.Deferred(buildlist(cb))
|
| - for x in range(3):
|
| - cb.result(x)
|
| - cb.errback(flow.Failure(IOError()))
|
| - return self.assertFailure(d, IOError)
|
| -
|
| - def testConcurrentCallback(self):
|
| - ca = flow.Callback()
|
| - ca.name = 'a'
|
| - cb = flow.Callback()
|
| - cb.name = 'b'
|
| - d = flow.Deferred(testconcur(ca,cb))
|
| - ca.result(1)
|
| - cb.result(2)
|
| - ca.result(3)
|
| - ca.result(4)
|
| - ca.finish()
|
| - cb.result(5)
|
| - cb.finish()
|
| - d.addCallback(self.assertEquals,
|
| - [('a',1),('b',2),('a',3),('a',4),('b',5)])
|
| - return d
|
| -
|
| - def testProtocolLocalhost(self):
|
| - # this fails if parallel tests are run on the same box
|
| - server = protocol.ServerFactory()
|
| - server.protocol = flow.Protocol
|
| - server.protocol.controller = echoServer
|
| - port = reactor.listenTCP(0, server)
|
| - client = protocol.ClientFactory()
|
| - client.protocol = flow.makeProtocol(echoClient)
|
| - client.d = defer.Deferred()
|
| - reactor.connectTCP("127.0.0.1", port.getHost().port, client)
|
| - client.d.addCallback(self.assertEquals, 'testing')
|
| - client.d.addBoth(lambda x :
|
| - client.protocol.transport.loseConnection())
|
| - client.d.addBoth(lambda x :
|
| - defer.maybeDeferred(port.stopListening))
|
| - return client.d
|
| - #testProtocolLocalhost.skip = "XXX freezes, fixme"
|
| -
|
| - def testProtocol(self):
|
| - from twisted.protocols import loopback
|
| - server = flow.Protocol()
|
| - server.controller = echoServer
|
| - client = flow.makeProtocol(echoClient)()
|
| - client.factory = protocol.ClientFactory()
|
| - client.factory.d = defer.Deferred()
|
| - d2 = loopback.loopbackAsync(server, client)
|
| - client.factory.d.addCallback(self.assertEquals, 'testing')
|
| - return defer.gatherResults([client.factory.d, d2])
|
| -
|
| -
|
| -class ThreadedFlowTest(unittest.TestCase):
|
| - if interfaces.IReactorThreads(reactor, None) is None:
|
| - skip = ("No thread support in reactor, "
|
| - "cannot test threaded flow constructs.")
|
| -
|
| -
|
| - def testThreaded(self):
|
| - expect = [5,4,3,2,1]
|
| - d = flow.Deferred(Threaded(CountIterator(5)))
|
| - d.addCallback(self.assertEquals, expect)
|
| - return d
|
| -
|
| - def testThreadedError(self):
|
| - # is this the expected behaviour?
|
| - def iterator():
|
| - yield 1
|
| - raise ValueError
|
| - d = flow.Deferred(Threaded(iterator()))
|
| - return self.assertFailure(d, ValueError)
|
| -
|
| - def testThreadedSleep(self):
|
| - expect = [5,4,3,2,1]
|
| - d = flow.Deferred(Threaded(CountIterator(5)))
|
| - sleep(.5)
|
| - d.addCallback(self.assertEquals, expect)
|
| - return d
|
| -
|
| - def testQueryIterator(self):
|
| - try:
|
| - from pyPgSQL import PgSQL
|
| - dbpool = PgSQL
|
| - c = dbpool.connect()
|
| - r = c.cursor()
|
| - r.execute("SELECT 'x'")
|
| - r.fetchone()
|
| - except:
|
| - # PostgreSQL is not installed or bad permissions
|
| - return
|
| - expect = [['one'],['two'],['three']]
|
| - sql = """
|
| - (SELECT 'one')
|
| - UNION ALL
|
| - (SELECT 'two')
|
| - UNION ALL
|
| - (SELECT 'three')
|
| - """
|
| - d = flow.Deferred(Threaded(QueryIterator(dbpool, sql)))
|
| - d.addCallback(self.assertEquals, expect)
|
| - return d
|
| -
|
| - def testThreadedImmediate(self):
|
| - """
|
| - The goal of this test is to test the callback mechanism with
|
| - regard to threads, namely to assure that results can be
|
| - accumulated before they are needed; and that left-over results
|
| - are immediately made available on the next round (even though
|
| - the producing thread has shut down). This is a very tough thing
|
| - to test due to the timing issues. So it may fail on some
|
| - platforms, I'm not sure.
|
| - """
|
| - expect = [5,4,3,2,1]
|
| - result = []
|
| - f = Threaded(CountIterator(5))
|
| - d = defer.Deferred()
|
| - def process():
|
| - coop = f._yield()
|
| - if f.results:
|
| - result.extend(f.results)
|
| - del f.results[:len(result)]
|
| - reactor.callLater(0, process)
|
| - return
|
| - if coop:
|
| - sleep(.3)
|
| - reactor.callLater(0, coop.callLater, process)
|
| - return
|
| - if f.stop:
|
| - reactor.callLater(0, d.callback, result)
|
| - reactor.callLater(0, process)
|
| - d.addCallback(self.assertEquals, expect)
|
| - return d
|
|
|