Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(102)

Unified Diff: third_party/twisted_8_1/twisted/test/test_policies.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/test/test_policies.py
diff --git a/third_party/twisted_8_1/twisted/test/test_policies.py b/third_party/twisted_8_1/twisted/test/test_policies.py
deleted file mode 100644
index 16f37e5c07dfcefb05f879c8afdb184e544611c6..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/test/test_policies.py
+++ /dev/null
@@ -1,682 +0,0 @@
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Test code for policies.
-"""
-
-from StringIO import StringIO
-
-from twisted.trial import unittest
-from twisted.test.proto_helpers import StringTransportWithDisconnection
-
-from twisted.internet import protocol, reactor, address, defer, task
-from twisted.protocols import policies
-
-
-
-class StringIOWithoutClosing(StringIO):
- def close(self): pass
-
-
-
-class SimpleProtocol(protocol.Protocol):
-
- connected = disconnected = 0
- buffer = ""
-
- def __init__(self):
- self.dConnected = defer.Deferred()
- self.dDisconnected = defer.Deferred()
-
- def connectionMade(self):
- self.connected = 1
- self.dConnected.callback('')
-
- def connectionLost(self, reason):
- self.disconnected = 1
- self.dDisconnected.callback('')
-
- def dataReceived(self, data):
- self.buffer += data
-
-
-
-class SillyFactory(protocol.ClientFactory):
-
- def __init__(self, p):
- self.p = p
-
- def buildProtocol(self, addr):
- return self.p
-
-
-class EchoProtocol(protocol.Protocol):
- paused = False
-
- def pauseProducing(self):
- self.paused = True
-
- def resumeProducing(self):
- self.paused = False
-
- def stopProducing(self):
- pass
-
- def dataReceived(self, data):
- self.transport.write(data)
-
-
-
-class Server(protocol.ServerFactory):
- """
- A simple server factory using L{EchoProtocol}.
- """
- protocol = EchoProtocol
-
-
-
-class TestableThrottlingFactory(policies.ThrottlingFactory):
- """
- L{policies.ThrottlingFactory} using a L{task.Clock} for tests.
- """
-
- def __init__(self, clock, *args, **kwargs):
- """
- @param clock: object providing a callLater method that can be used
- for tests.
- @type clock: C{task.Clock} or alike.
- """
- policies.ThrottlingFactory.__init__(self, *args, **kwargs)
- self.clock = clock
-
-
- def callLater(self, period, func):
- """
- Forward to the testable clock.
- """
- return self.clock.callLater(period, func)
-
-
-
-class TestableTimeoutFactory(policies.TimeoutFactory):
- """
- L{policies.TimeoutFactory} using a L{task.Clock} for tests.
- """
-
- def __init__(self, clock, *args, **kwargs):
- """
- @param clock: object providing a callLater method that can be used
- for tests.
- @type clock: C{task.Clock} or alike.
- """
- policies.TimeoutFactory.__init__(self, *args, **kwargs)
- self.clock = clock
-
-
- def callLater(self, period, func):
- """
- Forward to the testable clock.
- """
- return self.clock.callLater(period, func)
-
-
-
-class PausableStringTransport(StringTransportWithDisconnection):
- """
- A string transport saving the current production state.
-
- @ivar paused: whether the production is paused or not.
- @type paused: C{bool}
- """
- paused = False
-
- def pauseProducing(self):
- """
- Notification of production pause: set C{self.paused}.
- """
- self.paused = True
-
-
- def resumeProducing(self):
- """
- Notification of production restart: unset C{self.paused}.
- """
- self.paused = False
-
-
-
-class WrapperTestCase(unittest.TestCase):
- def testProtocolFactoryAttribute(self):
- """
- Make sure protocol.factory is the wrapped factory, not the wrapping
- factory.
- """
- f = Server()
- wf = policies.WrappingFactory(f)
- p = wf.buildProtocol(address.IPv4Address('TCP', '127.0.0.1', 35))
- self.assertIdentical(p.wrappedProtocol.factory, f)
-
-
-
-class WrappingFactory(policies.WrappingFactory):
- protocol = lambda s, f, p: p
-
- def startFactory(self):
- policies.WrappingFactory.startFactory(self)
- self.deferred.callback(None)
-
-
-
-class ThrottlingTestCase(unittest.TestCase):
- """
- Tests for L{policies.ThrottlingFactory}.
- """
-
- def test_limit(self):
- """
- Full test using a custom server limiting number of connections.
- """
- server = Server()
- c1, c2, c3, c4 = [SimpleProtocol() for i in range(4)]
- tServer = policies.ThrottlingFactory(server, 2)
- wrapTServer = WrappingFactory(tServer)
- wrapTServer.deferred = defer.Deferred()
-
- # Start listening
- p = reactor.listenTCP(0, wrapTServer, interface="127.0.0.1")
- n = p.getHost().port
-
- def _connect123(results):
- reactor.connectTCP("127.0.0.1", n, SillyFactory(c1))
- c1.dConnected.addCallback(
- lambda r: reactor.connectTCP("127.0.0.1", n, SillyFactory(c2)))
- c2.dConnected.addCallback(
- lambda r: reactor.connectTCP("127.0.0.1", n, SillyFactory(c3)))
- return c3.dDisconnected
-
- def _check123(results):
- self.assertEquals([c.connected for c in c1, c2, c3], [1, 1, 1])
- self.assertEquals([c.disconnected for c in c1, c2, c3], [0, 0, 1])
- self.assertEquals(len(tServer.protocols.keys()), 2)
- return results
-
- def _lose1(results):
- # disconnect one protocol and now another should be able to connect
- c1.transport.loseConnection()
- return c1.dDisconnected
-
- def _connect4(results):
- reactor.connectTCP("127.0.0.1", n, SillyFactory(c4))
- return c4.dConnected
-
- def _check4(results):
- self.assertEquals(c4.connected, 1)
- self.assertEquals(c4.disconnected, 0)
- return results
-
- def _cleanup(results):
- for c in c2, c4:
- c.transport.loseConnection()
- return defer.DeferredList([
- defer.maybeDeferred(p.stopListening),
- c2.dDisconnected,
- c4.dDisconnected])
-
- wrapTServer.deferred.addCallback(_connect123)
- wrapTServer.deferred.addCallback(_check123)
- wrapTServer.deferred.addCallback(_lose1)
- wrapTServer.deferred.addCallback(_connect4)
- wrapTServer.deferred.addCallback(_check4)
- wrapTServer.deferred.addCallback(_cleanup)
- return wrapTServer.deferred
-
-
- def test_writeLimit(self):
- """
- Check the writeLimit parameter: write data, and check for the pause
- status.
- """
- server = Server()
- tServer = TestableThrottlingFactory(task.Clock(), server, writeLimit=10)
- port = tServer.buildProtocol(address.IPv4Address('TCP', '127.0.0.1', 0))
- tr = PausableStringTransport()
- tr.protocol = port
- port.makeConnection(tr)
- port.producer = port.wrappedProtocol
-
- port.dataReceived("0123456789")
- port.dataReceived("abcdefghij")
- self.assertEquals(tr.value(), "0123456789abcdefghij")
- self.assertEquals(tServer.writtenThisSecond, 20)
- self.assertFalse(port.wrappedProtocol.paused)
-
- # at this point server should've written 20 bytes, 10 bytes
- # above the limit so writing should be paused around 1 second
- # from 'now', and resumed a second after that
- tServer.clock.advance(1.05)
- self.assertEquals(tServer.writtenThisSecond, 0)
- self.assertTrue(port.wrappedProtocol.paused)
-
- tServer.clock.advance(1.05)
- self.assertEquals(tServer.writtenThisSecond, 0)
- self.assertFalse(port.wrappedProtocol.paused)
-
-
- def test_readLimit(self):
- """
- Check the readLimit parameter: read data and check for the pause
- status.
- """
- server = Server()
- tServer = TestableThrottlingFactory(task.Clock(), server, readLimit=10)
- port = tServer.buildProtocol(address.IPv4Address('TCP', '127.0.0.1', 0))
- tr = PausableStringTransport()
- tr.protocol = port
- port.makeConnection(tr)
-
- port.dataReceived("0123456789")
- port.dataReceived("abcdefghij")
- self.assertEquals(tr.value(), "0123456789abcdefghij")
- self.assertEquals(tServer.readThisSecond, 20)
-
- tServer.clock.advance(1.05)
- self.assertEquals(tServer.readThisSecond, 0)
- self.assertTrue(tr.paused)
-
- tServer.clock.advance(1.05)
- self.assertEquals(tServer.readThisSecond, 0)
- self.assertFalse(tr.paused)
-
- tr.clear()
- port.dataReceived("0123456789")
- port.dataReceived("abcdefghij")
- self.assertEquals(tr.value(), "0123456789abcdefghij")
- self.assertEquals(tServer.readThisSecond, 20)
-
- tServer.clock.advance(1.05)
- self.assertEquals(tServer.readThisSecond, 0)
- self.assertTrue(tr.paused)
-
- tServer.clock.advance(1.05)
- self.assertEquals(tServer.readThisSecond, 0)
- self.assertFalse(tr.paused)
-
-
-
-class TimeoutTestCase(unittest.TestCase):
- """
- Tests for L{policies.TimeoutFactory}.
- """
-
- def setUp(self):
- """
- Create a testable, deterministic clock, and a set of
- server factory/protocol/transport.
- """
- self.clock = task.Clock()
- wrappedFactory = protocol.ServerFactory()
- wrappedFactory.protocol = SimpleProtocol
- self.factory = TestableTimeoutFactory(self.clock, wrappedFactory, 3)
- self.proto = self.factory.buildProtocol(
- address.IPv4Address('TCP', '127.0.0.1', 12345))
- self.transport = StringTransportWithDisconnection()
- self.transport.protocol = self.proto
- self.proto.makeConnection(self.transport)
-
-
- def test_timeout(self):
- """
- Make sure that when a TimeoutFactory accepts a connection, it will
- time out that connection if no data is read or written within the
- timeout period.
- """
- # Let almost 3 time units pass
- self.clock.pump([0.0, 0.5, 1.0, 1.0, 0.4])
- self.failIf(self.proto.wrappedProtocol.disconnected)
-
- # Now let the timer elapse
- self.clock.pump([0.0, 0.2])
- self.failUnless(self.proto.wrappedProtocol.disconnected)
-
-
- def test_sendAvoidsTimeout(self):
- """
- Make sure that writing data to a transport from a protocol
- constructed by a TimeoutFactory resets the timeout countdown.
- """
- # Let half the countdown period elapse
- self.clock.pump([0.0, 0.5, 1.0])
- self.failIf(self.proto.wrappedProtocol.disconnected)
-
- # Send some data (self.proto is the /real/ proto's transport, so this
- # is the write that gets called)
- self.proto.write('bytes bytes bytes')
-
- # More time passes, putting us past the original timeout
- self.clock.pump([0.0, 1.0, 1.0])
- self.failIf(self.proto.wrappedProtocol.disconnected)
-
- # Make sure writeSequence delays timeout as well
- self.proto.writeSequence(['bytes'] * 3)
-
- # Tick tock
- self.clock.pump([0.0, 1.0, 1.0])
- self.failIf(self.proto.wrappedProtocol.disconnected)
-
- # Don't write anything more, just let the timeout expire
- self.clock.pump([0.0, 2.0])
- self.failUnless(self.proto.wrappedProtocol.disconnected)
-
-
- def test_receiveAvoidsTimeout(self):
- """
- Make sure that receiving data also resets the timeout countdown.
- """
- # Let half the countdown period elapse
- self.clock.pump([0.0, 1.0, 0.5])
- self.failIf(self.proto.wrappedProtocol.disconnected)
-
- # Some bytes arrive, they should reset the counter
- self.proto.dataReceived('bytes bytes bytes')
-
- # We pass the original timeout
- self.clock.pump([0.0, 1.0, 1.0])
- self.failIf(self.proto.wrappedProtocol.disconnected)
-
- # Nothing more arrives though, the new timeout deadline is passed,
- # the connection should be dropped.
- self.clock.pump([0.0, 1.0, 1.0])
- self.failUnless(self.proto.wrappedProtocol.disconnected)
-
-
-
-class TimeoutTester(protocol.Protocol, policies.TimeoutMixin):
- """
- A testable protocol with timeout facility.
-
- @ivar timedOut: set to C{True} if a timeout has been detected.
- @type timedOut: C{bool}
- """
- timeOut = 3
- timedOut = False
-
- def __init__(self, clock):
- """
- Initialize the protocol with a C{task.Clock} object.
- """
- self.clock = clock
-
-
- def connectionMade(self):
- """
- Upon connection, set the timeout.
- """
- self.setTimeout(self.timeOut)
-
-
- def dataReceived(self, data):
- """
- Reset the timeout on data.
- """
- self.resetTimeout()
- protocol.Protocol.dataReceived(self, data)
-
-
- def connectionLost(self, reason=None):
- """
- On connection lost, cancel all timeout operations.
- """
- self.setTimeout(None)
-
-
- def timeoutConnection(self):
- """
- Flags the timedOut variable to indicate the timeout of the connection.
- """
- self.timedOut = True
-
-
- def callLater(self, timeout, func, *args, **kwargs):
- """
- Override callLater to use the deterministic clock.
- """
- return self.clock.callLater(timeout, func, *args, **kwargs)
-
-
-
-class TestTimeout(unittest.TestCase):
- """
- Tests for L{policies.TimeoutMixin}.
- """
-
- def setUp(self):
- """
- Create a testable, deterministic clock and a C{TimeoutTester} instance.
- """
- self.clock = task.Clock()
- self.proto = TimeoutTester(self.clock)
-
-
- def test_overriddenCallLater(self):
- """
- Test that the callLater of the clock is used instead of
- C{reactor.callLater}.
- """
- self.proto.setTimeout(10)
- self.assertEquals(len(self.clock.calls), 1)
-
-
- def test_timeout(self):
- """
- Check that the protocol does timeout at the time specified by its
- C{timeOut} attribute.
- """
- s = StringIOWithoutClosing()
- self.proto.makeConnection(protocol.FileWrapper(s))
-
- # timeOut value is 3
- self.clock.pump([0, 0.5, 1.0, 1.0])
- self.failIf(self.proto.timedOut)
- self.clock.pump([0, 1.0])
- self.failUnless(self.proto.timedOut)
-
-
- def test_noTimeout(self):
- """
- Check that receiving data is delaying the timeout of the connection.
- """
- s = StringIOWithoutClosing()
- self.proto.makeConnection(protocol.FileWrapper(s))
-
- self.clock.pump([0, 0.5, 1.0, 1.0])
- self.failIf(self.proto.timedOut)
- self.proto.dataReceived('hello there')
- self.clock.pump([0, 1.0, 1.0, 0.5])
- self.failIf(self.proto.timedOut)
- self.clock.pump([0, 1.0])
- self.failUnless(self.proto.timedOut)
-
-
- def test_resetTimeout(self):
- """
- Check that setting a new value for timeout cancel the previous value
- and install a new timeout.
- """
- self.proto.timeOut = None
- s = StringIOWithoutClosing()
- self.proto.makeConnection(protocol.FileWrapper(s))
-
- self.proto.setTimeout(1)
- self.assertEquals(self.proto.timeOut, 1)
-
- self.clock.pump([0, 0.9])
- self.failIf(self.proto.timedOut)
- self.clock.pump([0, 0.2])
- self.failUnless(self.proto.timedOut)
-
-
- def test_cancelTimeout(self):
- """
- Setting the timeout to C{None} cancel any timeout operations.
- """
- self.proto.timeOut = 5
- s = StringIOWithoutClosing()
- self.proto.makeConnection(protocol.FileWrapper(s))
-
- self.proto.setTimeout(None)
- self.assertEquals(self.proto.timeOut, None)
-
- self.clock.pump([0, 5, 5, 5])
- self.failIf(self.proto.timedOut)
-
-
- def test_return(self):
- """
- setTimeout should return the value of the previous timeout.
- """
- self.proto.timeOut = 5
-
- self.assertEquals(self.proto.setTimeout(10), 5)
- self.assertEquals(self.proto.setTimeout(None), 10)
- self.assertEquals(self.proto.setTimeout(1), None)
- self.assertEquals(self.proto.timeOut, 1)
-
- # Clean up the DelayedCall
- self.proto.setTimeout(None)
-
-
-
-class LimitTotalConnectionsFactoryTestCase(unittest.TestCase):
- """Tests for policies.LimitTotalConnectionsFactory"""
- def testConnectionCounting(self):
- # Make a basic factory
- factory = policies.LimitTotalConnectionsFactory()
- factory.protocol = protocol.Protocol
-
- # connectionCount starts at zero
- self.assertEqual(0, factory.connectionCount)
-
- # connectionCount increments as connections are made
- p1 = factory.buildProtocol(None)
- self.assertEqual(1, factory.connectionCount)
- p2 = factory.buildProtocol(None)
- self.assertEqual(2, factory.connectionCount)
-
- # and decrements as they are lost
- p1.connectionLost(None)
- self.assertEqual(1, factory.connectionCount)
- p2.connectionLost(None)
- self.assertEqual(0, factory.connectionCount)
-
- def testConnectionLimiting(self):
- # Make a basic factory with a connection limit of 1
- factory = policies.LimitTotalConnectionsFactory()
- factory.protocol = protocol.Protocol
- factory.connectionLimit = 1
-
- # Make a connection
- p = factory.buildProtocol(None)
- self.assertNotEqual(None, p)
- self.assertEqual(1, factory.connectionCount)
-
- # Try to make a second connection, which will exceed the connection
- # limit. This should return None, because overflowProtocol is None.
- self.assertEqual(None, factory.buildProtocol(None))
- self.assertEqual(1, factory.connectionCount)
-
- # Define an overflow protocol
- class OverflowProtocol(protocol.Protocol):
- def connectionMade(self):
- factory.overflowed = True
- factory.overflowProtocol = OverflowProtocol
- factory.overflowed = False
-
- # Try to make a second connection again, now that we have an overflow
- # protocol. Note that overflow connections count towards the connection
- # count.
- op = factory.buildProtocol(None)
- op.makeConnection(None) # to trigger connectionMade
- self.assertEqual(True, factory.overflowed)
- self.assertEqual(2, factory.connectionCount)
-
- # Close the connections.
- p.connectionLost(None)
- self.assertEqual(1, factory.connectionCount)
- op.connectionLost(None)
- self.assertEqual(0, factory.connectionCount)
-
-
-class WriteSequenceEchoProtocol(EchoProtocol):
- def dataReceived(self, bytes):
- if bytes.find('vector!') != -1:
- self.transport.writeSequence([bytes])
- else:
- EchoProtocol.dataReceived(self, bytes)
-
-class TestLoggingFactory(policies.TrafficLoggingFactory):
- openFile = None
- def open(self, name):
- assert self.openFile is None, "open() called too many times"
- self.openFile = StringIO()
- return self.openFile
-
-
-
-class LoggingFactoryTestCase(unittest.TestCase):
- """
- Tests for L{policies.TrafficLoggingFactory}.
- """
-
- def test_thingsGetLogged(self):
- """
- Check the output produced by L{policies.TrafficLoggingFactory}.
- """
- wrappedFactory = Server()
- wrappedFactory.protocol = WriteSequenceEchoProtocol
- t = StringTransportWithDisconnection()
- f = TestLoggingFactory(wrappedFactory, 'test')
- p = f.buildProtocol(('1.2.3.4', 5678))
- t.protocol = p
- p.makeConnection(t)
-
- v = f.openFile.getvalue()
- self.failUnless('*' in v, "* not found in %r" % (v,))
- self.failIf(t.value())
-
- p.dataReceived('here are some bytes')
-
- v = f.openFile.getvalue()
- self.assertIn("C 1: 'here are some bytes'", v)
- self.assertIn("S 1: 'here are some bytes'", v)
- self.assertEquals(t.value(), 'here are some bytes')
-
- t.clear()
- p.dataReceived('prepare for vector! to the extreme')
- v = f.openFile.getvalue()
- self.assertIn("SV 1: ['prepare for vector! to the extreme']", v)
- self.assertEquals(t.value(), 'prepare for vector! to the extreme')
-
- p.loseConnection()
-
- v = f.openFile.getvalue()
- self.assertIn('ConnectionDone', v)
-
-
- def test_counter(self):
- """
- Test counter management with the resetCounter method.
- """
- wrappedFactory = Server()
- f = TestLoggingFactory(wrappedFactory, 'test')
- self.assertEqual(f._counter, 0)
- f.buildProtocol(('1.2.3.4', 5678))
- self.assertEqual(f._counter, 1)
- # Reset log file
- f.openFile = None
- f.buildProtocol(('1.2.3.4', 5679))
- self.assertEqual(f._counter, 2)
-
- f.resetCounter()
- self.assertEqual(f._counter, 0)
-
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_plugin.py ('k') | third_party/twisted_8_1/twisted/test/test_postfix.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698