Index: third_party/twisted_8_1/twisted/test/test_tcp.py |
diff --git a/third_party/twisted_8_1/twisted/test/test_tcp.py b/third_party/twisted_8_1/twisted/test/test_tcp.py |
deleted file mode 100644 |
index 96d4c0288a012adcdfdf9d43b40963ebee591c2f..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/test/test_tcp.py |
+++ /dev/null |
@@ -1,1776 +0,0 @@ |
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-""" |
-Tests for implementations of L{IReactorTCP}. |
-""" |
- |
-import socket, random, errno |
- |
-from zope.interface import implements |
- |
-from twisted.trial import unittest |
- |
-from twisted.python.log import msg |
-from twisted.internet import protocol, reactor, defer, interfaces |
-from twisted.internet import error |
-from twisted.internet.address import IPv4Address |
-from twisted.internet.interfaces import IHalfCloseableProtocol, IPullProducer |
-from twisted.protocols import policies |
- |
-def loopUntil(predicate, interval=0): |
- """ |
- Poor excuse for an event notification helper. This polls a condition and |
- calls back a Deferred when it is seen to be true. |
- |
- Do not use this function. |
- """ |
- from twisted.internet import task |
- d = defer.Deferred() |
- def check(): |
- res = predicate() |
- if res: |
- d.callback(res) |
- call = task.LoopingCall(check) |
- def stop(result): |
- call.stop() |
- return result |
- d.addCallback(stop) |
- d2 = call.start(interval) |
- d2.addErrback(d.errback) |
- return d |
- |
- |
-class ClosingProtocol(protocol.Protocol): |
- |
- def connectionMade(self): |
- self.transport.loseConnection() |
- |
- def connectionLost(self, reason): |
- reason.trap(error.ConnectionDone) |
- |
-class ClosingFactory(protocol.ServerFactory): |
- """Factory that closes port immediatley.""" |
- |
- def buildProtocol(self, conn): |
- self.port.stopListening() |
- return ClosingProtocol() |
- |
- |
-class MyProtocol(protocol.Protocol): |
- made = closed = failed = 0 |
- |
- closedDeferred = None |
- |
- data = "" |
- |
- factory = None |
- |
- def connectionMade(self): |
- self.made = 1 |
- if (self.factory is not None and |
- self.factory.protocolConnectionMade is not None): |
- d = self.factory.protocolConnectionMade |
- self.factory.protocolConnectionMade = None |
- d.callback(self) |
- |
- def dataReceived(self, data): |
- self.data += data |
- |
- def connectionLost(self, reason): |
- self.closed = 1 |
- if self.closedDeferred is not None: |
- d, self.closedDeferred = self.closedDeferred, None |
- d.callback(None) |
- |
- |
-class MyProtocolFactoryMixin(object): |
- """ |
- Mixin for factories which create L{MyProtocol} instances. |
- |
- @type protocolFactory: no-argument callable |
- @ivar protocolFactory: Factory for protocols - takes the place of the |
- typical C{protocol} attribute of factories (but that name is used by |
- this class for something else). |
- |
- @type protocolConnectionMade: L{NoneType} or L{defer.Deferred} |
- @ivar protocolConnectionMade: When an instance of L{MyProtocol} is |
- connected, if this is not C{None}, the L{Deferred} will be called |
- back with the protocol instance and the attribute set to C{None}. |
- |
- @type protocolConnectionLost: L{NoneType} or L{defer.Deferred} |
- @ivar protocolConnectionLost: When an instance of L{MyProtocol} is |
- created, this will be set as its C{closedDeferred} attribute and |
- then this attribute will be set to C{None} so the L{defer.Deferred} |
- is not used by more than one protocol. |
- |
- @ivar protocol: The most recently created L{MyProtocol} instance which |
- was returned from C{buildProtocol}. |
- |
- @type called: C{int} |
- @ivar called: A counter which is incremented each time C{buildProtocol} |
- is called. |
- |
- @ivar peerAddresses: A C{list} of the addresses passed to C{buildProtocol}. |
- """ |
- protocolFactory = MyProtocol |
- |
- protocolConnectionMade = None |
- protocolConnectionLost = None |
- protocol = None |
- called = 0 |
- |
- def __init__(self): |
- self.peerAddresses = [] |
- |
- |
- def buildProtocol(self, addr): |
- """ |
- Create a L{MyProtocol} and set it up to be able to perform |
- callbacks. |
- """ |
- self.peerAddresses.append(addr) |
- self.called += 1 |
- p = self.protocolFactory() |
- p.factory = self |
- p.closedDeferred = self.protocolConnectionLost |
- self.protocolConnectionLost = None |
- self.protocol = p |
- return p |
- |
- |
- |
-class MyServerFactory(MyProtocolFactoryMixin, protocol.ServerFactory): |
- """ |
- Server factory which creates L{MyProtocol} instances. |
- """ |
- |
- |
- |
-class MyClientFactory(MyProtocolFactoryMixin, protocol.ClientFactory): |
- """ |
- Client factory which creates L{MyProtocol} instances. |
- """ |
- failed = 0 |
- stopped = 0 |
- |
- def __init__(self): |
- MyProtocolFactoryMixin.__init__(self) |
- self.deferred = defer.Deferred() |
- self.failDeferred = defer.Deferred() |
- |
- def clientConnectionFailed(self, connector, reason): |
- self.failed = 1 |
- self.reason = reason |
- self.failDeferred.callback(None) |
- |
- def clientConnectionLost(self, connector, reason): |
- self.lostReason = reason |
- self.deferred.callback(None) |
- |
- def stopFactory(self): |
- self.stopped = 1 |
- |
- |
- |
-class ListeningTestCase(unittest.TestCase): |
- |
- def test_listen(self): |
- """ |
- L{IReactorTCP.listenTCP} returns an object which provides |
- L{IListeningPort}. |
- """ |
- f = MyServerFactory() |
- p1 = reactor.listenTCP(0, f, interface="127.0.0.1") |
- self.addCleanup(p1.stopListening) |
- self.failUnless(interfaces.IListeningPort.providedBy(p1)) |
- |
- |
- def testStopListening(self): |
- """ |
- The L{IListeningPort} returned by L{IReactorTCP.listenTCP} can be |
- stopped with its C{stopListening} method. After the L{Deferred} it |
- (optionally) returns has been called back, the port number can be bound |
- to a new server. |
- """ |
- f = MyServerFactory() |
- port = reactor.listenTCP(0, f, interface="127.0.0.1") |
- n = port.getHost().port |
- |
- def cbStopListening(ignored): |
- # Make sure we can rebind the port right away |
- port = reactor.listenTCP(n, f, interface="127.0.0.1") |
- return port.stopListening() |
- |
- d = defer.maybeDeferred(port.stopListening) |
- d.addCallback(cbStopListening) |
- return d |
- |
- |
- def testNumberedInterface(self): |
- f = MyServerFactory() |
- # listen only on the loopback interface |
- p1 = reactor.listenTCP(0, f, interface='127.0.0.1') |
- return p1.stopListening() |
- |
- def testPortRepr(self): |
- f = MyServerFactory() |
- p = reactor.listenTCP(0, f) |
- portNo = str(p.getHost().port) |
- self.failIf(repr(p).find(portNo) == -1) |
- def stoppedListening(ign): |
- self.failIf(repr(p).find(portNo) != -1) |
- d = defer.maybeDeferred(p.stopListening) |
- return d.addCallback(stoppedListening) |
- |
- |
- def test_serverRepr(self): |
- """ |
- Check that the repr string of the server transport get the good port |
- number if the server listens on 0. |
- """ |
- server = MyServerFactory() |
- serverConnMade = server.protocolConnectionMade = defer.Deferred() |
- port = reactor.listenTCP(0, server) |
- self.addCleanup(port.stopListening) |
- |
- client = MyClientFactory() |
- clientConnMade = client.protocolConnectionMade = defer.Deferred() |
- connector = reactor.connectTCP("127.0.0.1", |
- port.getHost().port, client) |
- self.addCleanup(connector.disconnect) |
- def check((serverProto, clientProto)): |
- portNumber = port.getHost().port |
- self.assertEquals(repr(serverProto.transport), |
- "<MyProtocol #0 on %s>" % (portNumber,)) |
- serverProto.transport.loseConnection() |
- clientProto.transport.loseConnection() |
- return defer.gatherResults([serverConnMade, clientConnMade] |
- ).addCallback(check) |
- |
- |
- |
-def callWithSpew(f): |
- from twisted.python.util import spewerWithLinenums as spewer |
- import sys |
- sys.settrace(spewer) |
- try: |
- f() |
- finally: |
- sys.settrace(None) |
- |
-class LoopbackTestCase(unittest.TestCase): |
- """ |
- Test loopback connections. |
- """ |
- def test_closePortInProtocolFactory(self): |
- """ |
- A port created with L{IReactorTCP.listenTCP} can be connected to with |
- L{IReactorTCP.connectTCP}. |
- """ |
- f = ClosingFactory() |
- port = reactor.listenTCP(0, f, interface="127.0.0.1") |
- self.addCleanup(port.stopListening) |
- portNumber = port.getHost().port |
- f.port = port |
- clientF = MyClientFactory() |
- reactor.connectTCP("127.0.0.1", portNumber, clientF) |
- def check(x): |
- self.assertTrue(clientF.protocol.made) |
- self.assertTrue(port.disconnected) |
- clientF.lostReason.trap(error.ConnectionDone) |
- return clientF.deferred.addCallback(check) |
- |
- def _trapCnxDone(self, obj): |
- getattr(obj, 'trap', lambda x: None)(error.ConnectionDone) |
- |
- |
- def _connectedClientAndServerTest(self, callback): |
- """ |
- Invoke the given callback with a client protocol and a server protocol |
- which have been connected to each other. |
- """ |
- serverFactory = MyServerFactory() |
- serverConnMade = defer.Deferred() |
- serverFactory.protocolConnectionMade = serverConnMade |
- port = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") |
- self.addCleanup(port.stopListening) |
- |
- portNumber = port.getHost().port |
- clientF = MyClientFactory() |
- clientConnMade = defer.Deferred() |
- clientF.protocolConnectionMade = clientConnMade |
- reactor.connectTCP("127.0.0.1", portNumber, clientF) |
- |
- connsMade = defer.gatherResults([serverConnMade, clientConnMade]) |
- def connected((serverProtocol, clientProtocol)): |
- callback(serverProtocol, clientProtocol) |
- serverProtocol.transport.loseConnection() |
- clientProtocol.transport.loseConnection() |
- connsMade.addCallback(connected) |
- return connsMade |
- |
- |
- def test_tcpNoDelay(self): |
- """ |
- The transport of a protocol connected with L{IReactorTCP.connectTCP} or |
- L{IReactor.TCP.listenTCP} can have its I{TCP_NODELAY} state inspected |
- and manipulated with L{ITCPTransport.getTcpNoDelay} and |
- L{ITCPTransport.setTcpNoDelay}. |
- """ |
- def check(serverProtocol, clientProtocol): |
- for p in [serverProtocol, clientProtocol]: |
- transport = p.transport |
- self.assertEquals(transport.getTcpNoDelay(), 0) |
- transport.setTcpNoDelay(1) |
- self.assertEquals(transport.getTcpNoDelay(), 1) |
- transport.setTcpNoDelay(0) |
- self.assertEquals(transport.getTcpNoDelay(), 0) |
- return self._connectedClientAndServerTest(check) |
- |
- |
- def test_tcpKeepAlive(self): |
- """ |
- The transport of a protocol connected with L{IReactorTCP.connectTCP} or |
- L{IReactor.TCP.listenTCP} can have its I{SO_KEEPALIVE} state inspected |
- and manipulated with L{ITCPTransport.getTcpKeepAlive} and |
- L{ITCPTransport.setTcpKeepAlive}. |
- """ |
- def check(serverProtocol, clientProtocol): |
- for p in [serverProtocol, clientProtocol]: |
- transport = p.transport |
- self.assertEquals(transport.getTcpKeepAlive(), 0) |
- transport.setTcpKeepAlive(1) |
- self.assertEquals(transport.getTcpKeepAlive(), 1) |
- transport.setTcpKeepAlive(0) |
- self.assertEquals(transport.getTcpKeepAlive(), 0) |
- return self._connectedClientAndServerTest(check) |
- |
- |
- def testFailing(self): |
- clientF = MyClientFactory() |
- # XXX we assume no one is listening on TCP port 69 |
- reactor.connectTCP("127.0.0.1", 69, clientF, timeout=5) |
- def check(ignored): |
- clientF.reason.trap(error.ConnectionRefusedError) |
- return clientF.failDeferred.addCallback(check) |
- |
- |
- def test_connectionRefusedErrorNumber(self): |
- """ |
- Assert that the error number of the ConnectionRefusedError is |
- ECONNREFUSED, and not some other socket related error. |
- """ |
- |
- # Bind a number of ports in the operating system. We will attempt |
- # to connect to these in turn immediately after closing them, in the |
- # hopes that no one else has bound them in the mean time. Any |
- # connection which succeeds is ignored and causes us to move on to |
- # the next port. As soon as a connection attempt fails, we move on |
- # to making an assertion about how it failed. If they all succeed, |
- # the test will fail. |
- |
- # It would be nice to have a simpler, reliable way to cause a |
- # connection failure from the platform. |
- # |
- # On Linux (2.6.15), connecting to port 0 always fails. FreeBSD |
- # (5.4) rejects the connection attempt with EADDRNOTAVAIL. |
- # |
- # On FreeBSD (5.4), listening on a port and then repeatedly |
- # connecting to it without ever accepting any connections eventually |
- # leads to an ECONNREFUSED. On Linux (2.6.15), a seemingly |
- # unbounded number of connections succeed. |
- |
- serverSockets = [] |
- for i in xrange(10): |
- serverSocket = socket.socket() |
- serverSocket.bind(('127.0.0.1', 0)) |
- serverSocket.listen(1) |
- serverSockets.append(serverSocket) |
- random.shuffle(serverSockets) |
- |
- clientCreator = protocol.ClientCreator(reactor, protocol.Protocol) |
- |
- def tryConnectFailure(): |
- def connected(proto): |
- """ |
- Darn. Kill it and try again, if there are any tries left. |
- """ |
- proto.transport.loseConnection() |
- if serverSockets: |
- return tryConnectFailure() |
- self.fail("Could not fail to connect - could not test errno for that case.") |
- |
- serverSocket = serverSockets.pop() |
- serverHost, serverPort = serverSocket.getsockname() |
- serverSocket.close() |
- |
- connectDeferred = clientCreator.connectTCP(serverHost, serverPort) |
- connectDeferred.addCallback(connected) |
- return connectDeferred |
- |
- refusedDeferred = tryConnectFailure() |
- self.assertFailure(refusedDeferred, error.ConnectionRefusedError) |
- def connRefused(exc): |
- self.assertEqual(exc.osError, errno.ECONNREFUSED) |
- refusedDeferred.addCallback(connRefused) |
- def cleanup(passthrough): |
- while serverSockets: |
- serverSockets.pop().close() |
- return passthrough |
- refusedDeferred.addBoth(cleanup) |
- return refusedDeferred |
- |
- |
- def test_connectByServiceFail(self): |
- """ |
- Connecting to a named service which does not exist raises |
- L{error.ServiceNameUnknownError}. |
- """ |
- self.assertRaises( |
- error.ServiceNameUnknownError, |
- reactor.connectTCP, |
- "127.0.0.1", "thisbetternotexist", MyClientFactory()) |
- |
- |
- def test_connectByService(self): |
- """ |
- L{IReactorTCP.connectTCP} accepts the name of a service instead of a |
- port number and connects to the port number associated with that |
- service, as defined by L{socket.getservbyname}. |
- """ |
- serverFactory = MyServerFactory() |
- serverConnMade = defer.Deferred() |
- serverFactory.protocolConnectionMade = serverConnMade |
- port = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") |
- self.addCleanup(port.stopListening) |
- portNumber = port.getHost().port |
- clientFactory = MyClientFactory() |
- clientConnMade = defer.Deferred() |
- clientFactory.protocolConnectionMade = clientConnMade |
- |
- def fakeGetServicePortByName(serviceName, protocolName): |
- if serviceName == 'http' and protocolName == 'tcp': |
- return portNumber |
- return 10 |
- self.patch(socket, 'getservbyname', fakeGetServicePortByName) |
- |
- c = reactor.connectTCP('127.0.0.1', 'http', clientFactory) |
- |
- connMade = defer.gatherResults([serverConnMade, clientConnMade]) |
- def connected((serverProtocol, clientProtocol)): |
- self.assertTrue( |
- serverFactory.called, |
- "Server factory was not called upon to build a protocol.") |
- serverProtocol.transport.loseConnection() |
- clientProtocol.transport.loseConnection() |
- connMade.addCallback(connected) |
- return connMade |
- |
- |
-class StartStopFactory(protocol.Factory): |
- |
- started = 0 |
- stopped = 0 |
- |
- def startFactory(self): |
- if self.started or self.stopped: |
- raise RuntimeError |
- self.started = 1 |
- |
- def stopFactory(self): |
- if not self.started or self.stopped: |
- raise RuntimeError |
- self.stopped = 1 |
- |
- |
-class ClientStartStopFactory(MyClientFactory): |
- |
- started = 0 |
- stopped = 0 |
- |
- def startFactory(self): |
- if self.started or self.stopped: |
- raise RuntimeError |
- self.started = 1 |
- |
- def stopFactory(self): |
- if not self.started or self.stopped: |
- raise RuntimeError |
- self.stopped = 1 |
- |
- |
-class FactoryTestCase(unittest.TestCase): |
- """Tests for factories.""" |
- |
- def test_serverStartStop(self): |
- """ |
- The factory passed to L{IReactorTCP.listenTCP} should be started only |
- when it transitions from being used on no ports to being used on one |
- port and should be stopped only when it transitions from being used on |
- one port to being used on no ports. |
- """ |
- # Note - this test doesn't need to use listenTCP. It is exercising |
- # logic implemented in Factory.doStart and Factory.doStop, so it could |
- # just call that directly. Some other test can make sure that |
- # listenTCP and stopListening correctly call doStart and |
- # doStop. -exarkun |
- |
- f = StartStopFactory() |
- |
- # listen on port |
- p1 = reactor.listenTCP(0, f, interface='127.0.0.1') |
- self.addCleanup(p1.stopListening) |
- |
- self.assertEqual((f.started, f.stopped), (1, 0)) |
- |
- # listen on two more ports |
- p2 = reactor.listenTCP(0, f, interface='127.0.0.1') |
- p3 = reactor.listenTCP(0, f, interface='127.0.0.1') |
- |
- self.assertEqual((f.started, f.stopped), (1, 0)) |
- |
- # close two ports |
- d1 = defer.maybeDeferred(p1.stopListening) |
- d2 = defer.maybeDeferred(p2.stopListening) |
- closedDeferred = defer.gatherResults([d1, d2]) |
- def cbClosed(ignored): |
- self.assertEqual((f.started, f.stopped), (1, 0)) |
- # Close the last port |
- return p3.stopListening() |
- closedDeferred.addCallback(cbClosed) |
- |
- def cbClosedAll(ignored): |
- self.assertEquals((f.started, f.stopped), (1, 1)) |
- closedDeferred.addCallback(cbClosedAll) |
- return closedDeferred |
- |
- |
- def test_clientStartStop(self): |
- """ |
- The factory passed to L{IReactorTCP.connectTCP} should be started when |
- the connection attempt starts and stopped when it is over. |
- """ |
- f = ClosingFactory() |
- p = reactor.listenTCP(0, f, interface="127.0.0.1") |
- self.addCleanup(p.stopListening) |
- portNumber = p.getHost().port |
- f.port = p |
- |
- factory = ClientStartStopFactory() |
- reactor.connectTCP("127.0.0.1", portNumber, factory) |
- self.assertTrue(factory.started) |
- return loopUntil(lambda: factory.stopped) |
- |
- |
- |
-class ConnectorTestCase(unittest.TestCase): |
- |
- def test_connectorIdentity(self): |
- """ |
- L{IReactorTCP.connectTCP} returns an object which provides |
- L{IConnector}. The destination of the connector is the address which |
- was passed to C{connectTCP}. The same connector object is passed to |
- the factory's C{startedConnecting} method as to the factory's |
- C{clientConnectionLost} method. |
- """ |
- serverFactory = ClosingFactory() |
- tcpPort = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") |
- self.addCleanup(tcpPort.stopListening) |
- portNumber = tcpPort.getHost().port |
- serverFactory.port = tcpPort |
- |
- seenConnectors = [] |
- seenFailures = [] |
- |
- clientFactory = ClientStartStopFactory() |
- clientFactory.clientConnectionLost = ( |
- lambda connector, reason: (seenConnectors.append(connector), |
- seenFailures.append(reason))) |
- clientFactory.startedConnecting = seenConnectors.append |
- |
- connector = reactor.connectTCP("127.0.0.1", portNumber, clientFactory) |
- self.assertTrue(interfaces.IConnector.providedBy(connector)) |
- dest = connector.getDestination() |
- self.assertEquals(dest.type, "TCP") |
- self.assertEquals(dest.host, "127.0.0.1") |
- self.assertEquals(dest.port, portNumber) |
- |
- d = loopUntil(lambda: clientFactory.stopped) |
- def clientFactoryStopped(ignored): |
- seenFailures[0].trap(error.ConnectionDone) |
- self.assertEqual(seenConnectors, [connector, connector]) |
- d.addCallback(clientFactoryStopped) |
- return d |
- |
- |
- def test_userFail(self): |
- """ |
- Calling L{IConnector.stopConnecting} in C{Factory.startedConnecting} |
- results in C{Factory.clientConnectionFailed} being called with |
- L{error.UserError} as the reason. |
- """ |
- serverFactory = MyServerFactory() |
- tcpPort = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") |
- self.addCleanup(tcpPort.stopListening) |
- portNumber = tcpPort.getHost().port |
- |
- def startedConnecting(connector): |
- connector.stopConnecting() |
- |
- clientFactory = ClientStartStopFactory() |
- clientFactory.startedConnecting = startedConnecting |
- reactor.connectTCP("127.0.0.1", portNumber, clientFactory) |
- |
- d = loopUntil(lambda: clientFactory.stopped) |
- def check(ignored): |
- self.assertEquals(clientFactory.failed, 1) |
- clientFactory.reason.trap(error.UserError) |
- return d.addCallback(check) |
- |
- |
- def test_reconnect(self): |
- """ |
- Calling L{IConnector.connect} in C{Factory.clientConnectionLost} causes |
- a new connection attempt to be made. |
- """ |
- serverFactory = ClosingFactory() |
- tcpPort = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") |
- self.addCleanup(tcpPort.stopListening) |
- portNumber = tcpPort.getHost().port |
- serverFactory.port = tcpPort |
- |
- clientFactory = MyClientFactory() |
- |
- def clientConnectionLost(connector, reason): |
- connector.connect() |
- clientFactory.clientConnectionLost = clientConnectionLost |
- reactor.connectTCP("127.0.0.1", portNumber, clientFactory) |
- |
- d = loopUntil(lambda: clientFactory.failed) |
- def reconnectFailed(ignored): |
- p = clientFactory.protocol |
- self.assertEqual((p.made, p.closed), (1, 1)) |
- clientFactory.reason.trap(error.ConnectionRefusedError) |
- self.assertEqual(clientFactory.stopped, 1) |
- return d.addCallback(reconnectFailed) |
- |
- |
- |
-class CannotBindTestCase(unittest.TestCase): |
- """ |
- Tests for correct behavior when a reactor cannot bind to the required TCP |
- port. |
- """ |
- |
- def test_cannotBind(self): |
- """ |
- L{IReactorTCP.listenTCP} raises L{error.CannotListenError} if the |
- address to listen on is already in use. |
- """ |
- f = MyServerFactory() |
- |
- p1 = reactor.listenTCP(0, f, interface='127.0.0.1') |
- self.addCleanup(p1.stopListening) |
- n = p1.getHost().port |
- dest = p1.getHost() |
- self.assertEquals(dest.type, "TCP") |
- self.assertEquals(dest.host, "127.0.0.1") |
- self.assertEquals(dest.port, n) |
- |
- # make sure new listen raises error |
- self.assertRaises(error.CannotListenError, |
- reactor.listenTCP, n, f, interface='127.0.0.1') |
- |
- |
- |
- def _fireWhenDoneFunc(self, d, f): |
- """Returns closure that when called calls f and then callbacks d. |
- """ |
- from twisted.python import util as tputil |
- def newf(*args, **kw): |
- rtn = f(*args, **kw) |
- d.callback('') |
- return rtn |
- return tputil.mergeFunctionMetadata(f, newf) |
- |
- |
- def test_clientBind(self): |
- """ |
- L{IReactorTCP.connectTCP} calls C{Factory.clientConnectionFailed} with |
- L{error.ConnectBindError} if the bind address specified is already in |
- use. |
- """ |
- theDeferred = defer.Deferred() |
- sf = MyServerFactory() |
- sf.startFactory = self._fireWhenDoneFunc(theDeferred, sf.startFactory) |
- p = reactor.listenTCP(0, sf, interface="127.0.0.1") |
- self.addCleanup(p.stopListening) |
- |
- def _connect1(results): |
- d = defer.Deferred() |
- cf1 = MyClientFactory() |
- cf1.buildProtocol = self._fireWhenDoneFunc(d, cf1.buildProtocol) |
- reactor.connectTCP("127.0.0.1", p.getHost().port, cf1, |
- bindAddress=("127.0.0.1", 0)) |
- d.addCallback(_conmade, cf1) |
- return d |
- |
- def _conmade(results, cf1): |
- d = defer.Deferred() |
- cf1.protocol.connectionMade = self._fireWhenDoneFunc( |
- d, cf1.protocol.connectionMade) |
- d.addCallback(_check1connect2, cf1) |
- return d |
- |
- def _check1connect2(results, cf1): |
- self.assertEquals(cf1.protocol.made, 1) |
- |
- d1 = defer.Deferred() |
- d2 = defer.Deferred() |
- port = cf1.protocol.transport.getHost().port |
- cf2 = MyClientFactory() |
- cf2.clientConnectionFailed = self._fireWhenDoneFunc( |
- d1, cf2.clientConnectionFailed) |
- cf2.stopFactory = self._fireWhenDoneFunc(d2, cf2.stopFactory) |
- reactor.connectTCP("127.0.0.1", p.getHost().port, cf2, |
- bindAddress=("127.0.0.1", port)) |
- d1.addCallback(_check2failed, cf1, cf2) |
- d2.addCallback(_check2stopped, cf1, cf2) |
- dl = defer.DeferredList([d1, d2]) |
- dl.addCallback(_stop, cf1, cf2) |
- return dl |
- |
- def _check2failed(results, cf1, cf2): |
- self.assertEquals(cf2.failed, 1) |
- cf2.reason.trap(error.ConnectBindError) |
- self.assertTrue(cf2.reason.check(error.ConnectBindError)) |
- return results |
- |
- def _check2stopped(results, cf1, cf2): |
- self.assertEquals(cf2.stopped, 1) |
- return results |
- |
- def _stop(results, cf1, cf2): |
- d = defer.Deferred() |
- d.addCallback(_check1cleanup, cf1) |
- cf1.stopFactory = self._fireWhenDoneFunc(d, cf1.stopFactory) |
- cf1.protocol.transport.loseConnection() |
- return d |
- |
- def _check1cleanup(results, cf1): |
- self.assertEquals(cf1.stopped, 1) |
- |
- theDeferred.addCallback(_connect1) |
- return theDeferred |
- |
- |
- |
-class MyOtherClientFactory(protocol.ClientFactory): |
- def buildProtocol(self, address): |
- self.address = address |
- self.protocol = MyProtocol() |
- return self.protocol |
- |
- |
- |
-class LocalRemoteAddressTestCase(unittest.TestCase): |
- """ |
- Tests for correct getHost/getPeer values and that the correct address is |
- passed to buildProtocol. |
- """ |
- def test_hostAddress(self): |
- """ |
- L{IListeningPort.getHost} returns the same address as a client |
- connection's L{ITCPTransport.getPeer}. |
- """ |
- f1 = MyServerFactory() |
- p1 = reactor.listenTCP(0, f1, interface='127.0.0.1') |
- self.addCleanup(p1.stopListening) |
- n = p1.getHost().port |
- |
- f2 = MyOtherClientFactory() |
- p2 = reactor.connectTCP('127.0.0.1', n, f2) |
- |
- d = loopUntil(lambda :p2.state == "connected") |
- def check(ignored): |
- self.assertEquals(p1.getHost(), f2.address) |
- self.assertEquals(p1.getHost(), f2.protocol.transport.getPeer()) |
- return p1.stopListening() |
- def cleanup(ignored): |
- p2.transport.loseConnection() |
- return d.addCallback(check).addCallback(cleanup) |
- |
- |
-class WriterProtocol(protocol.Protocol): |
- def connectionMade(self): |
- # use everything ITransport claims to provide. If something here |
- # fails, the exception will be written to the log, but it will not |
- # directly flunk the test. The test will fail when maximum number of |
- # iterations have passed and the writer's factory.done has not yet |
- # been set. |
- self.transport.write("Hello Cleveland!\n") |
- seq = ["Goodbye", " cruel", " world", "\n"] |
- self.transport.writeSequence(seq) |
- peer = self.transport.getPeer() |
- if peer.type != "TCP": |
- print "getPeer returned non-TCP socket:", peer |
- self.factory.problem = 1 |
- us = self.transport.getHost() |
- if us.type != "TCP": |
- print "getHost returned non-TCP socket:", us |
- self.factory.problem = 1 |
- self.factory.done = 1 |
- |
- self.transport.loseConnection() |
- |
-class ReaderProtocol(protocol.Protocol): |
- def dataReceived(self, data): |
- self.factory.data += data |
- def connectionLost(self, reason): |
- self.factory.done = 1 |
- |
-class WriterClientFactory(protocol.ClientFactory): |
- def __init__(self): |
- self.done = 0 |
- self.data = "" |
- def buildProtocol(self, addr): |
- p = ReaderProtocol() |
- p.factory = self |
- self.protocol = p |
- return p |
- |
-class WriteDataTestCase(unittest.TestCase): |
- """ |
- Test that connected TCP sockets can actually write data. Try to exercise |
- the entire ITransport interface. |
- """ |
- |
- def test_writer(self): |
- """ |
- L{ITCPTransport.write} and L{ITCPTransport.writeSequence} send bytes to |
- the other end of the connection. |
- """ |
- f = protocol.Factory() |
- f.protocol = WriterProtocol |
- f.done = 0 |
- f.problem = 0 |
- wrappedF = WiredFactory(f) |
- p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1") |
- self.addCleanup(p.stopListening) |
- n = p.getHost().port |
- clientF = WriterClientFactory() |
- wrappedClientF = WiredFactory(clientF) |
- reactor.connectTCP("127.0.0.1", n, wrappedClientF) |
- |
- def check(ignored): |
- self.failUnless(f.done, "writer didn't finish, it probably died") |
- self.failUnless(f.problem == 0, "writer indicated an error") |
- self.failUnless(clientF.done, |
- "client didn't see connection dropped") |
- expected = "".join(["Hello Cleveland!\n", |
- "Goodbye", " cruel", " world", "\n"]) |
- self.failUnless(clientF.data == expected, |
- "client didn't receive all the data it expected") |
- d = defer.gatherResults([wrappedF.onDisconnect, |
- wrappedClientF.onDisconnect]) |
- return d.addCallback(check) |
- |
- |
- def test_writeAfterShutdownWithoutReading(self): |
- """ |
- A TCP transport which is written to after the connection has been shut |
- down should notify its protocol that the connection has been lost, even |
- if the TCP transport is not actively being monitored for read events |
- (ie, pauseProducing was called on it). |
- """ |
- # This is an unpleasant thing. Generally tests shouldn't skip or |
- # run based on the name of the reactor being used (most tests |
- # shouldn't care _at all_ what reactor is being used, in fact). The |
- # Gtk reactor cannot pass this test, though, because it fails to |
- # implement IReactorTCP entirely correctly. Gtk is quite old at |
- # this point, so it's more likely that gtkreactor will be deprecated |
- # and removed rather than fixed to handle this case correctly. |
- # Since this is a pre-existing (and very long-standing) issue with |
- # the Gtk reactor, there's no reason for it to prevent this test |
- # being added to exercise the other reactors, for which the behavior |
- # was also untested but at least works correctly (now). See #2833 |
- # for information on the status of gtkreactor. |
- if reactor.__class__.__name__ == 'IOCPReactor': |
- raise unittest.SkipTest( |
- "iocpreactor does not, in fact, stop reading immediately after " |
- "pauseProducing is called. This results in a bonus disconnection " |
- "notification. Under some circumstances, it might be possible to " |
- "not receive this notifications (specifically, pauseProducing, " |
- "deliver some data, proceed with this test).") |
- if reactor.__class__.__name__ == 'GtkReactor': |
- raise unittest.SkipTest( |
- "gtkreactor does not implement unclean disconnection " |
- "notification correctly. This might more properly be " |
- "a todo, but due to technical limitations it cannot be.") |
- |
- # Called back after the protocol for the client side of the connection |
- # has paused its transport, preventing it from reading, therefore |
- # preventing it from noticing the disconnection before the rest of the |
- # actions which are necessary to trigger the case this test is for have |
- # been taken. |
- clientPaused = defer.Deferred() |
- |
- # Called back when the protocol for the server side of the connection |
- # has received connection lost notification. |
- serverLost = defer.Deferred() |
- |
- class Disconnecter(protocol.Protocol): |
- """ |
- Protocol for the server side of the connection which disconnects |
- itself in a callback on clientPaused and publishes notification |
- when its connection is actually lost. |
- """ |
- def connectionMade(self): |
- """ |
- Set up a callback on clientPaused to lose the connection. |
- """ |
- msg('Disconnector.connectionMade') |
- def disconnect(ignored): |
- msg('Disconnector.connectionMade disconnect') |
- self.transport.loseConnection() |
- msg('loseConnection called') |
- clientPaused.addCallback(disconnect) |
- |
- def connectionLost(self, reason): |
- """ |
- Notify observers that the server side of the connection has |
- ended. |
- """ |
- msg('Disconnecter.connectionLost') |
- serverLost.callback(None) |
- msg('serverLost called back') |
- |
- # Create the server port to which a connection will be made. |
- server = protocol.ServerFactory() |
- server.protocol = Disconnecter |
- port = reactor.listenTCP(0, server, interface='127.0.0.1') |
- self.addCleanup(port.stopListening) |
- addr = port.getHost() |
- |
- class Infinite(object): |
- """ |
- A producer which will write to its consumer as long as |
- resumeProducing is called. |
- |
- @ivar consumer: The L{IConsumer} which will be written to. |
- """ |
- implements(IPullProducer) |
- |
- def __init__(self, consumer): |
- self.consumer = consumer |
- |
- def resumeProducing(self): |
- msg('Infinite.resumeProducing') |
- self.consumer.write('x') |
- msg('Infinite.resumeProducing wrote to consumer') |
- |
- def stopProducing(self): |
- msg('Infinite.stopProducing') |
- |
- |
- class UnreadingWriter(protocol.Protocol): |
- """ |
- Trivial protocol which pauses its transport immediately and then |
- writes some bytes to it. |
- """ |
- def connectionMade(self): |
- msg('UnreadingWriter.connectionMade') |
- self.transport.pauseProducing() |
- clientPaused.callback(None) |
- msg('clientPaused called back') |
- def write(ignored): |
- msg('UnreadingWriter.connectionMade write') |
- # This needs to be enough bytes to spill over into the |
- # userspace Twisted send buffer - if it all fits into |
- # the kernel, Twisted won't even poll for OUT events, |
- # which means it won't poll for any events at all, so |
- # the disconnection is never noticed. This is due to |
- # #1662. When #1662 is fixed, this test will likely |
- # need to be adjusted, otherwise connection lost |
- # notification will happen too soon and the test will |
- # probably begin to fail with ConnectionDone instead of |
- # ConnectionLost (in any case, it will no longer be |
- # entirely correct). |
- producer = Infinite(self.transport) |
- msg('UnreadingWriter.connectionMade write created producer') |
- self.transport.registerProducer(producer, False) |
- msg('UnreadingWriter.connectionMade write registered producer') |
- serverLost.addCallback(write) |
- |
- # Create the client and initiate the connection |
- client = MyClientFactory() |
- client.protocolFactory = UnreadingWriter |
- clientConnectionLost = client.deferred |
- def cbClientLost(ignored): |
- msg('cbClientLost') |
- return client.lostReason |
- clientConnectionLost.addCallback(cbClientLost) |
- msg('Connecting to %s:%s' % (addr.host, addr.port)) |
- connector = reactor.connectTCP(addr.host, addr.port, client) |
- |
- # By the end of the test, the client should have received notification |
- # of unclean disconnection. |
- msg('Returning Deferred') |
- return self.assertFailure(clientConnectionLost, error.ConnectionLost) |
- |
- |
- |
-class ConnectionLosingProtocol(protocol.Protocol): |
- def connectionMade(self): |
- self.transport.write("1") |
- self.transport.loseConnection() |
- self.master._connectionMade() |
- self.master.ports.append(self.transport) |
- |
- |
- |
-class NoopProtocol(protocol.Protocol): |
- def connectionMade(self): |
- self.d = defer.Deferred() |
- self.master.serverConns.append(self.d) |
- |
- def connectionLost(self, reason): |
- self.d.callback(True) |
- |
- |
- |
-class ConnectionLostNotifyingProtocol(protocol.Protocol): |
- """ |
- Protocol which fires a Deferred which was previously passed to |
- its initializer when the connection is lost. |
- """ |
- def __init__(self, onConnectionLost): |
- self.onConnectionLost = onConnectionLost |
- |
- |
- def connectionLost(self, reason): |
- self.onConnectionLost.callback(self) |
- |
- |
- |
-class HandleSavingProtocol(ConnectionLostNotifyingProtocol): |
- """ |
- Protocol which grabs the platform-specific socket handle and |
- saves it as an attribute on itself when the connection is |
- established. |
- """ |
- def makeConnection(self, transport): |
- """ |
- Save the platform-specific socket handle for future |
- introspection. |
- """ |
- self.handle = transport.getHandle() |
- return protocol.Protocol.makeConnection(self, transport) |
- |
- |
- |
-class ProperlyCloseFilesMixin: |
- """ |
- Tests for platform resources properly being cleaned up. |
- """ |
- def createServer(self, address, portNumber, factory): |
- """ |
- Bind a server port to which connections will be made. The server |
- should use the given protocol factory. |
- |
- @return: The L{IListeningPort} for the server created. |
- """ |
- raise NotImplementedError() |
- |
- |
- def connectClient(self, address, portNumber, clientCreator): |
- """ |
- Establish a connection to the given address using the given |
- L{ClientCreator} instance. |
- |
- @return: A Deferred which will fire with the connected protocol instance. |
- """ |
- raise NotImplementedError() |
- |
- |
- def getHandleExceptionType(self): |
- """ |
- Return the exception class which will be raised when an operation is |
- attempted on a closed platform handle. |
- """ |
- raise NotImplementedError() |
- |
- |
- def getHandleErrorCode(self): |
- """ |
- Return the errno expected to result from writing to a closed |
- platform socket handle. |
- """ |
- # These platforms have been seen to give EBADF: |
- # |
- # Linux 2.4.26, Linux 2.6.15, OS X 10.4, FreeBSD 5.4 |
- # Windows 2000 SP 4, Windows XP SP 2 |
- return errno.EBADF |
- |
- |
- def test_properlyCloseFiles(self): |
- """ |
- Test that lost connections properly have their underlying socket |
- resources cleaned up. |
- """ |
- onServerConnectionLost = defer.Deferred() |
- serverFactory = protocol.ServerFactory() |
- serverFactory.protocol = lambda: ConnectionLostNotifyingProtocol( |
- onServerConnectionLost) |
- serverPort = self.createServer('127.0.0.1', 0, serverFactory) |
- |
- onClientConnectionLost = defer.Deferred() |
- serverAddr = serverPort.getHost() |
- clientCreator = protocol.ClientCreator( |
- reactor, lambda: HandleSavingProtocol(onClientConnectionLost)) |
- clientDeferred = self.connectClient( |
- serverAddr.host, serverAddr.port, clientCreator) |
- |
- def clientConnected(client): |
- """ |
- Disconnect the client. Return a Deferred which fires when both |
- the client and the server have received disconnect notification. |
- """ |
- client.transport.loseConnection() |
- return defer.gatherResults([ |
- onClientConnectionLost, onServerConnectionLost]) |
- clientDeferred.addCallback(clientConnected) |
- |
- def clientDisconnected((client, server)): |
- """ |
- Verify that the underlying platform socket handle has been |
- cleaned up. |
- """ |
- expectedErrorCode = self.getHandleErrorCode() |
- err = self.assertRaises( |
- self.getHandleExceptionType(), client.handle.send, 'bytes') |
- self.assertEqual(err.args[0], expectedErrorCode) |
- clientDeferred.addCallback(clientDisconnected) |
- |
- def cleanup(passthrough): |
- """ |
- Shut down the server port. Return a Deferred which fires when |
- this has completed. |
- """ |
- result = defer.maybeDeferred(serverPort.stopListening) |
- result.addCallback(lambda ign: passthrough) |
- return result |
- clientDeferred.addBoth(cleanup) |
- |
- return clientDeferred |
- |
- |
- |
-class ProperlyCloseFilesTestCase(unittest.TestCase, ProperlyCloseFilesMixin): |
- def createServer(self, address, portNumber, factory): |
- return reactor.listenTCP(portNumber, factory, interface=address) |
- |
- |
- def connectClient(self, address, portNumber, clientCreator): |
- return clientCreator.connectTCP(address, portNumber) |
- |
- |
- def getHandleExceptionType(self): |
- return socket.error |
- |
- |
- |
-class WiredForDeferreds(policies.ProtocolWrapper): |
- def __init__(self, factory, wrappedProtocol): |
- policies.ProtocolWrapper.__init__(self, factory, wrappedProtocol) |
- |
- def connectionMade(self): |
- policies.ProtocolWrapper.connectionMade(self) |
- self.factory.onConnect.callback(None) |
- |
- def connectionLost(self, reason): |
- policies.ProtocolWrapper.connectionLost(self, reason) |
- self.factory.onDisconnect.callback(None) |
- |
- |
- |
-class WiredFactory(policies.WrappingFactory): |
- protocol = WiredForDeferreds |
- |
- def __init__(self, wrappedFactory): |
- policies.WrappingFactory.__init__(self, wrappedFactory) |
- self.onConnect = defer.Deferred() |
- self.onDisconnect = defer.Deferred() |
- |
- |
- |
-class AddressTestCase(unittest.TestCase): |
- """ |
- Tests for address-related interactions with client and server protocols. |
- """ |
- def setUp(self): |
- """ |
- Create a port and connected client/server pair which can be used |
- to test factory behavior related to addresses. |
- |
- @return: A L{defer.Deferred} which will be called back when both the |
- client and server protocols have received their connection made |
- callback. |
- """ |
- class RememberingWrapper(protocol.ClientFactory): |
- """ |
- Simple wrapper factory which records the addresses which are |
- passed to its L{buildProtocol} method and delegates actual |
- protocol creation to another factory. |
- |
- @ivar addresses: A list of the objects passed to buildProtocol. |
- @ivar factory: The wrapped factory to which protocol creation is |
- delegated. |
- """ |
- def __init__(self, factory): |
- self.addresses = [] |
- self.factory = factory |
- |
- # Only bother to pass on buildProtocol calls to the wrapped |
- # factory - doStart, doStop, etc aren't necessary for this test |
- # to pass. |
- def buildProtocol(self, addr): |
- """ |
- Append the given address to C{self.addresses} and forward |
- the call to C{self.factory}. |
- """ |
- self.addresses.append(addr) |
- return self.factory.buildProtocol(addr) |
- |
- # Make a server which we can receive connection and disconnection |
- # notification for, and which will record the address passed to its |
- # buildProtocol. |
- self.server = MyServerFactory() |
- self.serverConnMade = self.server.protocolConnectionMade = defer.Deferred() |
- self.serverConnLost = self.server.protocolConnectionLost = defer.Deferred() |
- # RememberingWrapper is a ClientFactory, but ClientFactory is-a |
- # ServerFactory, so this is okay. |
- self.serverWrapper = RememberingWrapper(self.server) |
- |
- # Do something similar for a client. |
- self.client = MyClientFactory() |
- self.clientConnMade = self.client.protocolConnectionMade = defer.Deferred() |
- self.clientConnLost = self.client.protocolConnectionLost = defer.Deferred() |
- self.clientWrapper = RememberingWrapper(self.client) |
- |
- self.port = reactor.listenTCP(0, self.serverWrapper, interface='127.0.0.1') |
- self.connector = reactor.connectTCP( |
- self.port.getHost().host, self.port.getHost().port, self.clientWrapper) |
- |
- return defer.gatherResults([self.serverConnMade, self.clientConnMade]) |
- |
- |
- def tearDown(self): |
- """ |
- Disconnect the client/server pair and shutdown the port created in |
- L{setUp}. |
- """ |
- self.connector.disconnect() |
- return defer.gatherResults([ |
- self.serverConnLost, self.clientConnLost, |
- defer.maybeDeferred(self.port.stopListening)]) |
- |
- |
- def test_buildProtocolClient(self): |
- """ |
- L{ClientFactory.buildProtocol} should be invoked with the address of |
- the server to which a connection has been established, which should |
- be the same as the address reported by the C{getHost} method of the |
- transport of the server protocol and as the C{getPeer} method of the |
- transport of the client protocol. |
- """ |
- serverHost = self.server.protocol.transport.getHost() |
- clientPeer = self.client.protocol.transport.getPeer() |
- |
- self.assertEqual( |
- self.clientWrapper.addresses, |
- [IPv4Address('TCP', serverHost.host, serverHost.port)]) |
- self.assertEqual( |
- self.clientWrapper.addresses, |
- [IPv4Address('TCP', clientPeer.host, clientPeer.port)]) |
- |
- |
- def test_buildProtocolServer(self): |
- """ |
- L{ServerFactory.buildProtocol} should be invoked with the address of |
- the client which has connected to the port the factory is listening on, |
- which should be the same as the address reported by the C{getPeer} |
- method of the transport of the server protocol and as the C{getHost} |
- method of the transport of the client protocol. |
- """ |
- clientHost = self.client.protocol.transport.getHost() |
- serverPeer = self.server.protocol.transport.getPeer() |
- |
- self.assertEqual( |
- self.serverWrapper.addresses, |
- [IPv4Address('TCP', serverPeer.host, serverPeer.port)]) |
- self.assertEqual( |
- self.serverWrapper.addresses, |
- [IPv4Address('TCP', clientHost.host, clientHost.port)]) |
- |
- |
- |
-class LargeBufferWriterProtocol(protocol.Protocol): |
- |
- # Win32 sockets cannot handle single huge chunks of bytes. Write one |
- # massive string to make sure Twisted deals with this fact. |
- |
- def connectionMade(self): |
- # write 60MB |
- self.transport.write('X'*self.factory.len) |
- self.factory.done = 1 |
- self.transport.loseConnection() |
- |
-class LargeBufferReaderProtocol(protocol.Protocol): |
- def dataReceived(self, data): |
- self.factory.len += len(data) |
- def connectionLost(self, reason): |
- self.factory.done = 1 |
- |
-class LargeBufferReaderClientFactory(protocol.ClientFactory): |
- def __init__(self): |
- self.done = 0 |
- self.len = 0 |
- def buildProtocol(self, addr): |
- p = LargeBufferReaderProtocol() |
- p.factory = self |
- self.protocol = p |
- return p |
- |
- |
-class FireOnClose(policies.ProtocolWrapper): |
- """A wrapper around a protocol that makes it fire a deferred when |
- connectionLost is called. |
- """ |
- def connectionLost(self, reason): |
- policies.ProtocolWrapper.connectionLost(self, reason) |
- self.factory.deferred.callback(None) |
- |
- |
-class FireOnCloseFactory(policies.WrappingFactory): |
- protocol = FireOnClose |
- |
- def __init__(self, wrappedFactory): |
- policies.WrappingFactory.__init__(self, wrappedFactory) |
- self.deferred = defer.Deferred() |
- |
- |
-class LargeBufferTestCase(unittest.TestCase): |
- """Test that buffering large amounts of data works. |
- """ |
- |
- datalen = 60*1024*1024 |
- def testWriter(self): |
- f = protocol.Factory() |
- f.protocol = LargeBufferWriterProtocol |
- f.done = 0 |
- f.problem = 0 |
- f.len = self.datalen |
- wrappedF = FireOnCloseFactory(f) |
- p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1") |
- self.addCleanup(p.stopListening) |
- n = p.getHost().port |
- clientF = LargeBufferReaderClientFactory() |
- wrappedClientF = FireOnCloseFactory(clientF) |
- reactor.connectTCP("127.0.0.1", n, wrappedClientF) |
- |
- d = defer.gatherResults([wrappedF.deferred, wrappedClientF.deferred]) |
- def check(ignored): |
- self.failUnless(f.done, "writer didn't finish, it probably died") |
- self.failUnless(clientF.len == self.datalen, |
- "client didn't receive all the data it expected " |
- "(%d != %d)" % (clientF.len, self.datalen)) |
- self.failUnless(clientF.done, |
- "client didn't see connection dropped") |
- return d.addCallback(check) |
- |
- |
-class MyHCProtocol(MyProtocol): |
- |
- implements(IHalfCloseableProtocol) |
- |
- readHalfClosed = False |
- writeHalfClosed = False |
- |
- def readConnectionLost(self): |
- self.readHalfClosed = True |
- # Invoke notification logic from the base class to simplify testing. |
- if self.writeHalfClosed: |
- self.connectionLost(None) |
- |
- def writeConnectionLost(self): |
- self.writeHalfClosed = True |
- # Invoke notification logic from the base class to simplify testing. |
- if self.readHalfClosed: |
- self.connectionLost(None) |
- |
- |
-class MyHCFactory(protocol.ServerFactory): |
- |
- called = 0 |
- protocolConnectionMade = None |
- |
- def buildProtocol(self, addr): |
- self.called += 1 |
- p = MyHCProtocol() |
- p.factory = self |
- self.protocol = p |
- return p |
- |
- |
-class HalfCloseTestCase(unittest.TestCase): |
- """Test half-closing connections.""" |
- |
- def setUp(self): |
- self.f = f = MyHCFactory() |
- self.p = p = reactor.listenTCP(0, f, interface="127.0.0.1") |
- self.addCleanup(p.stopListening) |
- d = loopUntil(lambda :p.connected) |
- |
- self.cf = protocol.ClientCreator(reactor, MyHCProtocol) |
- |
- d.addCallback(lambda _: self.cf.connectTCP(p.getHost().host, |
- p.getHost().port)) |
- d.addCallback(self._setUp) |
- return d |
- |
- def _setUp(self, client): |
- self.client = client |
- self.clientProtoConnectionLost = self.client.closedDeferred = defer.Deferred() |
- self.assertEquals(self.client.transport.connected, 1) |
- # Wait for the server to notice there is a connection, too. |
- return loopUntil(lambda: getattr(self.f, 'protocol', None) is not None) |
- |
- def tearDown(self): |
- self.assertEquals(self.client.closed, 0) |
- self.client.transport.loseConnection() |
- d = defer.maybeDeferred(self.p.stopListening) |
- d.addCallback(lambda ign: self.clientProtoConnectionLost) |
- d.addCallback(self._tearDown) |
- return d |
- |
- def _tearDown(self, ignored): |
- self.assertEquals(self.client.closed, 1) |
- # because we did half-close, the server also needs to |
- # closed explicitly. |
- self.assertEquals(self.f.protocol.closed, 0) |
- d = defer.Deferred() |
- def _connectionLost(reason): |
- self.f.protocol.closed = 1 |
- d.callback(None) |
- self.f.protocol.connectionLost = _connectionLost |
- self.f.protocol.transport.loseConnection() |
- d.addCallback(lambda x:self.assertEquals(self.f.protocol.closed, 1)) |
- return d |
- |
- def testCloseWriteCloser(self): |
- client = self.client |
- f = self.f |
- t = client.transport |
- |
- t.write("hello") |
- d = loopUntil(lambda :len(t._tempDataBuffer) == 0) |
- def loseWrite(ignored): |
- t.loseWriteConnection() |
- return loopUntil(lambda :t._writeDisconnected) |
- def check(ignored): |
- self.assertEquals(client.closed, False) |
- self.assertEquals(client.writeHalfClosed, True) |
- self.assertEquals(client.readHalfClosed, False) |
- return loopUntil(lambda :f.protocol.readHalfClosed) |
- def write(ignored): |
- w = client.transport.write |
- w(" world") |
- w("lalala fooled you") |
- self.assertEquals(0, len(client.transport._tempDataBuffer)) |
- self.assertEquals(f.protocol.data, "hello") |
- self.assertEquals(f.protocol.closed, False) |
- self.assertEquals(f.protocol.readHalfClosed, True) |
- return d.addCallback(loseWrite).addCallback(check).addCallback(write) |
- |
- def testWriteCloseNotification(self): |
- f = self.f |
- f.protocol.transport.loseWriteConnection() |
- |
- d = defer.gatherResults([ |
- loopUntil(lambda :f.protocol.writeHalfClosed), |
- loopUntil(lambda :self.client.readHalfClosed)]) |
- d.addCallback(lambda _: self.assertEquals( |
- f.protocol.readHalfClosed, False)) |
- return d |
- |
- |
-class HalfClose2TestCase(unittest.TestCase): |
- |
- def setUp(self): |
- self.f = f = MyServerFactory() |
- self.f.protocolConnectionMade = defer.Deferred() |
- self.p = p = reactor.listenTCP(0, f, interface="127.0.0.1") |
- |
- # XXX we don't test server side yet since we don't do it yet |
- d = protocol.ClientCreator(reactor, MyProtocol).connectTCP( |
- p.getHost().host, p.getHost().port) |
- d.addCallback(self._gotClient) |
- return d |
- |
- def _gotClient(self, client): |
- self.client = client |
- # Now wait for the server to catch up - it doesn't matter if this |
- # Deferred has already fired and gone away, in that case we'll |
- # return None and not wait at all, which is precisely correct. |
- return self.f.protocolConnectionMade |
- |
- def tearDown(self): |
- self.client.transport.loseConnection() |
- return self.p.stopListening() |
- |
- def testNoNotification(self): |
- """ |
- TCP protocols support half-close connections, but not all of them |
- support being notified of write closes. In this case, test that |
- half-closing the connection causes the peer's connection to be |
- closed. |
- """ |
- self.client.transport.write("hello") |
- self.client.transport.loseWriteConnection() |
- self.f.protocol.closedDeferred = d = defer.Deferred() |
- self.client.closedDeferred = d2 = defer.Deferred() |
- d.addCallback(lambda x: |
- self.assertEqual(self.f.protocol.data, 'hello')) |
- d.addCallback(lambda x: self.assertEqual(self.f.protocol.closed, True)) |
- return defer.gatherResults([d, d2]) |
- |
- def testShutdownException(self): |
- """ |
- If the other side has already closed its connection, |
- loseWriteConnection should pass silently. |
- """ |
- self.f.protocol.transport.loseConnection() |
- self.client.transport.write("X") |
- self.client.transport.loseWriteConnection() |
- self.f.protocol.closedDeferred = d = defer.Deferred() |
- self.client.closedDeferred = d2 = defer.Deferred() |
- d.addCallback(lambda x: |
- self.failUnlessEqual(self.f.protocol.closed, True)) |
- return defer.gatherResults([d, d2]) |
- |
- |
-class HalfCloseBuggyApplicationTests(unittest.TestCase): |
- """ |
- Test half-closing connections where notification code has bugs. |
- """ |
- |
- def setUp(self): |
- """ |
- Set up a server and connect a client to it. Return a Deferred which |
- only fires once this is done. |
- """ |
- self.serverFactory = MyHCFactory() |
- self.serverFactory.protocolConnectionMade = defer.Deferred() |
- self.port = reactor.listenTCP( |
- 0, self.serverFactory, interface="127.0.0.1") |
- self.addCleanup(self.port.stopListening) |
- addr = self.port.getHost() |
- creator = protocol.ClientCreator(reactor, MyHCProtocol) |
- clientDeferred = creator.connectTCP(addr.host, addr.port) |
- def setClient(clientProtocol): |
- self.clientProtocol = clientProtocol |
- clientDeferred.addCallback(setClient) |
- return defer.gatherResults([ |
- self.serverFactory.protocolConnectionMade, |
- clientDeferred]) |
- |
- |
- def aBug(self, *args): |
- """ |
- Fake implementation of a callback which illegally raises an |
- exception. |
- """ |
- raise RuntimeError("ONO I AM BUGGY CODE") |
- |
- |
- def _notificationRaisesTest(self): |
- """ |
- Helper for testing that an exception is logged by the time the |
- client protocol loses its connection. |
- """ |
- closed = self.clientProtocol.closedDeferred = defer.Deferred() |
- self.clientProtocol.transport.loseWriteConnection() |
- def check(ignored): |
- errors = self.flushLoggedErrors(RuntimeError) |
- self.assertEqual(len(errors), 1) |
- closed.addCallback(check) |
- return closed |
- |
- |
- def test_readNotificationRaises(self): |
- """ |
- If C{readConnectionLost} raises an exception when the transport |
- calls it to notify the protocol of that event, the exception should |
- be logged and the protocol should be disconnected completely. |
- """ |
- self.serverFactory.protocol.readConnectionLost = self.aBug |
- return self._notificationRaisesTest() |
- |
- |
- def test_writeNotificationRaises(self): |
- """ |
- If C{writeConnectionLost} raises an exception when the transport |
- calls it to notify the protocol of that event, the exception should |
- be logged and the protocol should be disconnected completely. |
- """ |
- self.clientProtocol.writeConnectionLost = self.aBug |
- return self._notificationRaisesTest() |
- |
- |
- |
-class LogTestCase(unittest.TestCase): |
- """ |
- Test logging facility of TCP base classes. |
- """ |
- |
- def test_logstrClientSetup(self): |
- """ |
- Check that the log customization of the client transport happens |
- once the client is connected. |
- """ |
- server = MyServerFactory() |
- |
- client = MyClientFactory() |
- client.protocolConnectionMade = defer.Deferred() |
- |
- port = reactor.listenTCP(0, server, interface='127.0.0.1') |
- self.addCleanup(port.stopListening) |
- |
- connector = reactor.connectTCP( |
- port.getHost().host, port.getHost().port, client) |
- self.addCleanup(connector.disconnect) |
- |
- # It should still have the default value |
- self.assertEquals(connector.transport.logstr, |
- "Uninitialized") |
- |
- def cb(ign): |
- self.assertEquals(connector.transport.logstr, |
- "MyProtocol,client") |
- client.protocolConnectionMade.addCallback(cb) |
- return client.protocolConnectionMade |
- |
- |
- |
-class PauseProducingTestCase(unittest.TestCase): |
- """ |
- Test some behaviors of pausing the production of a transport. |
- """ |
- |
- def test_pauseProducingInConnectionMade(self): |
- """ |
- In C{connectionMade} of a client protocol, C{pauseProducing} used to be |
- ignored: this test is here to ensure it's not ignored. |
- """ |
- server = MyServerFactory() |
- |
- client = MyClientFactory() |
- client.protocolConnectionMade = defer.Deferred() |
- |
- port = reactor.listenTCP(0, server, interface='127.0.0.1') |
- self.addCleanup(port.stopListening) |
- |
- connector = reactor.connectTCP( |
- port.getHost().host, port.getHost().port, client) |
- self.addCleanup(connector.disconnect) |
- |
- def checkInConnectionMade(proto): |
- tr = proto.transport |
- # The transport should already be monitored |
- self.assertIn(tr, reactor.getReaders() + |
- reactor.getWriters()) |
- proto.transport.pauseProducing() |
- self.assertNotIn(tr, reactor.getReaders() + |
- reactor.getWriters()) |
- d = defer.Deferred() |
- d.addCallback(checkAfterConnectionMade) |
- reactor.callLater(0, d.callback, proto) |
- return d |
- def checkAfterConnectionMade(proto): |
- tr = proto.transport |
- # The transport should still not be monitored |
- self.assertNotIn(tr, reactor.getReaders() + |
- reactor.getWriters()) |
- client.protocolConnectionMade.addCallback(checkInConnectionMade) |
- return client.protocolConnectionMade |
- |
- if not interfaces.IReactorFDSet.providedBy(reactor): |
- test_pauseProducingInConnectionMade.skip = "Reactor not providing IReactorFDSet" |
- |
- |
- |
-class CallBackOrderTestCase(unittest.TestCase): |
- """ |
- Test the order of reactor callbacks |
- """ |
- |
- def test_loseOrder(self): |
- """ |
- Check that Protocol.connectionLost is called before factory's |
- clientConnectionLost |
- """ |
- server = MyServerFactory() |
- server.protocolConnectionMade = (defer.Deferred() |
- .addCallback(lambda proto: self.addCleanup( |
- proto.transport.loseConnection))) |
- |
- client = MyClientFactory() |
- client.protocolConnectionLost = defer.Deferred() |
- client.protocolConnectionMade = defer.Deferred() |
- |
- def _cbCM(res): |
- """ |
- protocol.connectionMade callback |
- """ |
- reactor.callLater(0, client.protocol.transport.loseConnection) |
- |
- client.protocolConnectionMade.addCallback(_cbCM) |
- |
- port = reactor.listenTCP(0, server, interface='127.0.0.1') |
- self.addCleanup(port.stopListening) |
- |
- connector = reactor.connectTCP( |
- port.getHost().host, port.getHost().port, client) |
- self.addCleanup(connector.disconnect) |
- |
- def _cbCCL(res): |
- """ |
- factory.clientConnectionLost callback |
- """ |
- return 'CCL' |
- |
- def _cbCL(res): |
- """ |
- protocol.connectionLost callback |
- """ |
- return 'CL' |
- |
- def _cbGather(res): |
- self.assertEquals(res, ['CL', 'CCL']) |
- |
- d = defer.gatherResults([ |
- client.protocolConnectionLost.addCallback(_cbCL), |
- client.deferred.addCallback(_cbCCL)]) |
- return d.addCallback(_cbGather) |
- |
- |
- |
-try: |
- import resource |
-except ImportError: |
- pass |
-else: |
- numRounds = resource.getrlimit(resource.RLIMIT_NOFILE)[0] + 10 |
- ProperlyCloseFilesTestCase.numberRounds = numRounds |