Index: third_party/twisted_8_1/twisted/test/test_loopback.py |
diff --git a/third_party/twisted_8_1/twisted/test/test_loopback.py b/third_party/twisted_8_1/twisted/test/test_loopback.py |
deleted file mode 100644 |
index 53b2bfa4a484d1519b470886d25c7f73d1dfdf92..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/test/test_loopback.py |
+++ /dev/null |
@@ -1,328 +0,0 @@ |
-# Copyright (c) 2001-2004 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
- |
-""" |
-Test case for twisted.protocols.loopback |
-""" |
- |
-from zope.interface import implements |
- |
-from twisted.trial import unittest |
-from twisted.trial.util import suppress as SUPPRESS |
-from twisted.protocols import basic, loopback |
-from twisted.internet import defer |
-from twisted.internet.protocol import Protocol |
-from twisted.internet.defer import Deferred |
-from twisted.internet.interfaces import IAddress, IPushProducer, IPullProducer |
-from twisted.internet import reactor |
- |
- |
-class SimpleProtocol(basic.LineReceiver): |
- def __init__(self): |
- self.conn = defer.Deferred() |
- self.lines = [] |
- self.connLost = [] |
- |
- def connectionMade(self): |
- self.conn.callback(None) |
- |
- def lineReceived(self, line): |
- self.lines.append(line) |
- |
- def connectionLost(self, reason): |
- self.connLost.append(reason) |
- |
- |
-class DoomProtocol(SimpleProtocol): |
- i = 0 |
- def lineReceived(self, line): |
- self.i += 1 |
- if self.i < 4: |
- # by this point we should have connection closed, |
- # but just in case we didn't we won't ever send 'Hello 4' |
- self.sendLine("Hello %d" % self.i) |
- SimpleProtocol.lineReceived(self, line) |
- if self.lines[-1] == "Hello 3": |
- self.transport.loseConnection() |
- |
- |
-class LoopbackTestCaseMixin: |
- def testRegularFunction(self): |
- s = SimpleProtocol() |
- c = SimpleProtocol() |
- |
- def sendALine(result): |
- s.sendLine("THIS IS LINE ONE!") |
- s.transport.loseConnection() |
- s.conn.addCallback(sendALine) |
- |
- def check(ignored): |
- self.assertEquals(c.lines, ["THIS IS LINE ONE!"]) |
- self.assertEquals(len(s.connLost), 1) |
- self.assertEquals(len(c.connLost), 1) |
- d = defer.maybeDeferred(self.loopbackFunc, s, c) |
- d.addCallback(check) |
- return d |
- |
- def testSneakyHiddenDoom(self): |
- s = DoomProtocol() |
- c = DoomProtocol() |
- |
- def sendALine(result): |
- s.sendLine("DOOM LINE") |
- s.conn.addCallback(sendALine) |
- |
- def check(ignored): |
- self.assertEquals(s.lines, ['Hello 1', 'Hello 2', 'Hello 3']) |
- self.assertEquals(c.lines, ['DOOM LINE', 'Hello 1', 'Hello 2', 'Hello 3']) |
- self.assertEquals(len(s.connLost), 1) |
- self.assertEquals(len(c.connLost), 1) |
- d = defer.maybeDeferred(self.loopbackFunc, s, c) |
- d.addCallback(check) |
- return d |
- |
- |
- |
-class LoopbackTestCase(LoopbackTestCaseMixin, unittest.TestCase): |
- loopbackFunc = staticmethod(loopback.loopback) |
- |
- def testRegularFunction(self): |
- """ |
- Suppress loopback deprecation warning. |
- """ |
- return LoopbackTestCaseMixin.testRegularFunction(self) |
- testRegularFunction.suppress = [ |
- SUPPRESS(message="loopback\(\) is deprecated", |
- category=DeprecationWarning)] |
- |
- |
- |
-class LoopbackAsyncTestCase(LoopbackTestCase): |
- loopbackFunc = staticmethod(loopback.loopbackAsync) |
- |
- |
- def test_makeConnection(self): |
- """ |
- Test that the client and server protocol both have makeConnection |
- invoked on them by loopbackAsync. |
- """ |
- class TestProtocol(Protocol): |
- transport = None |
- def makeConnection(self, transport): |
- self.transport = transport |
- |
- server = TestProtocol() |
- client = TestProtocol() |
- loopback.loopbackAsync(server, client) |
- self.failIfEqual(client.transport, None) |
- self.failIfEqual(server.transport, None) |
- |
- |
- def _hostpeertest(self, get, testServer): |
- """ |
- Test one of the permutations of client/server host/peer. |
- """ |
- class TestProtocol(Protocol): |
- def makeConnection(self, transport): |
- Protocol.makeConnection(self, transport) |
- self.onConnection.callback(transport) |
- |
- if testServer: |
- server = TestProtocol() |
- d = server.onConnection = Deferred() |
- client = Protocol() |
- else: |
- server = Protocol() |
- client = TestProtocol() |
- d = client.onConnection = Deferred() |
- |
- loopback.loopbackAsync(server, client) |
- |
- def connected(transport): |
- host = getattr(transport, get)() |
- self.failUnless(IAddress.providedBy(host)) |
- |
- return d.addCallback(connected) |
- |
- |
- def test_serverHost(self): |
- """ |
- Test that the server gets a transport with a properly functioning |
- implementation of L{ITransport.getHost}. |
- """ |
- return self._hostpeertest("getHost", True) |
- |
- |
- def test_serverPeer(self): |
- """ |
- Like C{test_serverHost} but for L{ITransport.getPeer} |
- """ |
- return self._hostpeertest("getPeer", True) |
- |
- |
- def test_clientHost(self, get="getHost"): |
- """ |
- Test that the client gets a transport with a properly functioning |
- implementation of L{ITransport.getHost}. |
- """ |
- return self._hostpeertest("getHost", False) |
- |
- |
- def test_clientPeer(self): |
- """ |
- Like C{test_clientHost} but for L{ITransport.getPeer}. |
- """ |
- return self._hostpeertest("getPeer", False) |
- |
- |
- def _greetingtest(self, write, testServer): |
- """ |
- Test one of the permutations of write/writeSequence client/server. |
- """ |
- class GreeteeProtocol(Protocol): |
- bytes = "" |
- def dataReceived(self, bytes): |
- self.bytes += bytes |
- if self.bytes == "bytes": |
- self.received.callback(None) |
- |
- class GreeterProtocol(Protocol): |
- def connectionMade(self): |
- getattr(self.transport, write)("bytes") |
- |
- if testServer: |
- server = GreeterProtocol() |
- client = GreeteeProtocol() |
- d = client.received = Deferred() |
- else: |
- server = GreeteeProtocol() |
- d = server.received = Deferred() |
- client = GreeterProtocol() |
- |
- loopback.loopbackAsync(server, client) |
- return d |
- |
- |
- def test_clientGreeting(self): |
- """ |
- Test that on a connection where the client speaks first, the server |
- receives the bytes sent by the client. |
- """ |
- return self._greetingtest("write", False) |
- |
- |
- def test_clientGreetingSequence(self): |
- """ |
- Like C{test_clientGreeting}, but use C{writeSequence} instead of |
- C{write} to issue the greeting. |
- """ |
- return self._greetingtest("writeSequence", False) |
- |
- |
- def test_serverGreeting(self, write="write"): |
- """ |
- Test that on a connection where the server speaks first, the client |
- receives the bytes sent by the server. |
- """ |
- return self._greetingtest("write", True) |
- |
- |
- def test_serverGreetingSequence(self): |
- """ |
- Like C{test_serverGreeting}, but use C{writeSequence} instead of |
- C{write} to issue the greeting. |
- """ |
- return self._greetingtest("writeSequence", True) |
- |
- |
- def _producertest(self, producerClass): |
- toProduce = map(str, range(0, 10)) |
- |
- class ProducingProtocol(Protocol): |
- def connectionMade(self): |
- self.producer = producerClass(list(toProduce)) |
- self.producer.start(self.transport) |
- |
- class ReceivingProtocol(Protocol): |
- bytes = "" |
- def dataReceived(self, bytes): |
- self.bytes += bytes |
- if self.bytes == ''.join(toProduce): |
- self.received.callback((client, server)) |
- |
- server = ProducingProtocol() |
- client = ReceivingProtocol() |
- client.received = Deferred() |
- |
- loopback.loopbackAsync(server, client) |
- return client.received |
- |
- |
- def test_pushProducer(self): |
- """ |
- Test a push producer registered against a loopback transport. |
- """ |
- class PushProducer(object): |
- implements(IPushProducer) |
- resumed = False |
- |
- def __init__(self, toProduce): |
- self.toProduce = toProduce |
- |
- def resumeProducing(self): |
- self.resumed = True |
- |
- def start(self, consumer): |
- self.consumer = consumer |
- consumer.registerProducer(self, True) |
- self._produceAndSchedule() |
- |
- def _produceAndSchedule(self): |
- if self.toProduce: |
- self.consumer.write(self.toProduce.pop(0)) |
- reactor.callLater(0, self._produceAndSchedule) |
- else: |
- self.consumer.unregisterProducer() |
- d = self._producertest(PushProducer) |
- |
- def finished((client, server)): |
- self.failIf( |
- server.producer.resumed, |
- "Streaming producer should not have been resumed.") |
- d.addCallback(finished) |
- return d |
- |
- |
- def test_pullProducer(self): |
- """ |
- Test a pull producer registered against a loopback transport. |
- """ |
- class PullProducer(object): |
- implements(IPullProducer) |
- |
- def __init__(self, toProduce): |
- self.toProduce = toProduce |
- |
- def start(self, consumer): |
- self.consumer = consumer |
- self.consumer.registerProducer(self, False) |
- |
- def resumeProducing(self): |
- self.consumer.write(self.toProduce.pop(0)) |
- if not self.toProduce: |
- self.consumer.unregisterProducer() |
- return self._producertest(PullProducer) |
- |
- |
-class LoopbackTCPTestCase(LoopbackTestCase): |
- loopbackFunc = staticmethod(loopback.loopbackTCP) |
- |
- |
-class LoopbackUNIXTestCase(LoopbackTestCase): |
- loopbackFunc = staticmethod(loopback.loopbackUNIX) |
- |
- def setUp(self): |
- from twisted.internet import reactor, interfaces |
- if interfaces.IReactorUNIX(reactor, None) is None: |
- raise unittest.SkipTest("Current reactor does not support UNIX sockets") |