| Index: third_party/twisted_8_1/twisted/test/test_protocols.py
|
| diff --git a/third_party/twisted_8_1/twisted/test/test_protocols.py b/third_party/twisted_8_1/twisted/test/test_protocols.py
|
| deleted file mode 100644
|
| index 8fc8ae5318e023bb919c74d6c6ef945e2d3cd966..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/test/test_protocols.py
|
| +++ /dev/null
|
| @@ -1,725 +0,0 @@
|
| -# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -
|
| -"""
|
| -Test cases for twisted.protocols package.
|
| -"""
|
| -
|
| -from twisted.trial import unittest
|
| -from twisted.protocols import basic, wire, portforward
|
| -from twisted.internet import reactor, protocol, defer, task, error
|
| -from twisted.test import proto_helpers
|
| -
|
| -import struct
|
| -import StringIO
|
| -
|
| -class StringIOWithoutClosing(StringIO.StringIO):
|
| - """
|
| - A StringIO that can't be closed.
|
| - """
|
| - def close(self):
|
| - """
|
| - Do nothing.
|
| - """
|
| -
|
| -class LineTester(basic.LineReceiver):
|
| - """
|
| - A line receiver that parses data received and make actions on some tokens.
|
| -
|
| - @type delimiter: C{str}
|
| - @ivar delimiter: character used between received lines.
|
| - @type MAX_LENGTH: C{int}
|
| - @ivar MAX_LENGTH: size of a line when C{lineLengthExceeded} will be called.
|
| - @type clock: L{twisted.internet.task.Clock}
|
| - @ivar clock: clock simulating reactor callLater. Pass it to constructor if
|
| - you want to use the pause/rawpause functionalities.
|
| - """
|
| -
|
| - delimiter = '\n'
|
| - MAX_LENGTH = 64
|
| -
|
| - def __init__(self, clock=None):
|
| - """
|
| - If given, use a clock to make callLater calls.
|
| - """
|
| - self.clock = clock
|
| -
|
| - def connectionMade(self):
|
| - """
|
| - Create/clean data received on connection.
|
| - """
|
| - self.received = []
|
| -
|
| - def lineReceived(self, line):
|
| - """
|
| - Receive line and make some action for some tokens: pause, rawpause,
|
| - stop, len, produce, unproduce.
|
| - """
|
| - self.received.append(line)
|
| - if line == '':
|
| - self.setRawMode()
|
| - elif line == 'pause':
|
| - self.pauseProducing()
|
| - self.clock.callLater(0, self.resumeProducing)
|
| - elif line == 'rawpause':
|
| - self.pauseProducing()
|
| - self.setRawMode()
|
| - self.received.append('')
|
| - self.clock.callLater(0, self.resumeProducing)
|
| - elif line == 'stop':
|
| - self.stopProducing()
|
| - elif line[:4] == 'len ':
|
| - self.length = int(line[4:])
|
| - elif line.startswith('produce'):
|
| - self.transport.registerProducer(self, False)
|
| - elif line.startswith('unproduce'):
|
| - self.transport.unregisterProducer()
|
| -
|
| - def rawDataReceived(self, data):
|
| - """
|
| - Read raw data, until the quantity specified by a previous 'len' line is
|
| - reached.
|
| - """
|
| - data, rest = data[:self.length], data[self.length:]
|
| - self.length = self.length - len(data)
|
| - self.received[-1] = self.received[-1] + data
|
| - if self.length == 0:
|
| - self.setLineMode(rest)
|
| -
|
| - def lineLengthExceeded(self, line):
|
| - """
|
| - Adjust line mode when long lines received.
|
| - """
|
| - if len(line) > self.MAX_LENGTH + 1:
|
| - self.setLineMode(line[self.MAX_LENGTH + 1:])
|
| -
|
| -
|
| -class LineOnlyTester(basic.LineOnlyReceiver):
|
| - """
|
| - A buffering line only receiver.
|
| - """
|
| - delimiter = '\n'
|
| - MAX_LENGTH = 64
|
| -
|
| - def connectionMade(self):
|
| - """
|
| - Create/clean data received on connection.
|
| - """
|
| - self.received = []
|
| -
|
| - def lineReceived(self, line):
|
| - """
|
| - Save received data.
|
| - """
|
| - self.received.append(line)
|
| -
|
| -class WireTestCase(unittest.TestCase):
|
| - """
|
| - Test wire protocols.
|
| - """
|
| - def testEcho(self):
|
| - """
|
| - Test wire.Echo protocol: send some data and check it send it back.
|
| - """
|
| - t = StringIOWithoutClosing()
|
| - a = wire.Echo()
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - a.dataReceived("hello")
|
| - a.dataReceived("world")
|
| - a.dataReceived("how")
|
| - a.dataReceived("are")
|
| - a.dataReceived("you")
|
| - self.failUnlessEqual(t.getvalue(), "helloworldhowareyou")
|
| -
|
| - def testWho(self):
|
| - """
|
| - Test wire.Who protocol.
|
| - """
|
| - t = StringIOWithoutClosing()
|
| - a = wire.Who()
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - self.failUnlessEqual(t.getvalue(), "root\r\n")
|
| -
|
| - def testQOTD(self):
|
| - """
|
| - Test wire.QOTD protocol.
|
| - """
|
| - t = StringIOWithoutClosing()
|
| - a = wire.QOTD()
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - self.failUnlessEqual(t.getvalue(),
|
| - "An apple a day keeps the doctor away.\r\n")
|
| -
|
| - def testDiscard(self):
|
| - """
|
| - Test wire.Discard protocol.
|
| - """
|
| - t = StringIOWithoutClosing()
|
| - a = wire.Discard()
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - a.dataReceived("hello")
|
| - a.dataReceived("world")
|
| - a.dataReceived("how")
|
| - a.dataReceived("are")
|
| - a.dataReceived("you")
|
| - self.failUnlessEqual(t.getvalue(), "")
|
| -
|
| -class LineReceiverTestCase(unittest.TestCase):
|
| - """
|
| - Test LineReceiver, using the C{LineTester} wrapper.
|
| - """
|
| - buffer = '''\
|
| -len 10
|
| -
|
| -0123456789len 5
|
| -
|
| -1234
|
| -len 20
|
| -foo 123
|
| -
|
| -0123456789
|
| -012345678len 0
|
| -foo 5
|
| -
|
| -1234567890123456789012345678901234567890123456789012345678901234567890
|
| -len 1
|
| -
|
| -a'''
|
| -
|
| - output = ['len 10', '0123456789', 'len 5', '1234\n',
|
| - 'len 20', 'foo 123', '0123456789\n012345678',
|
| - 'len 0', 'foo 5', '', '67890', 'len 1', 'a']
|
| -
|
| - def testBuffer(self):
|
| - """
|
| - Test buffering for different packet size, checking received matches
|
| - expected data.
|
| - """
|
| - for packet_size in range(1, 10):
|
| - t = StringIOWithoutClosing()
|
| - a = LineTester()
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - for i in range(len(self.buffer)/packet_size + 1):
|
| - s = self.buffer[i*packet_size:(i+1)*packet_size]
|
| - a.dataReceived(s)
|
| - self.failUnlessEqual(self.output, a.received)
|
| -
|
| -
|
| - pause_buf = 'twiddle1\ntwiddle2\npause\ntwiddle3\n'
|
| -
|
| - pause_output1 = ['twiddle1', 'twiddle2', 'pause']
|
| - pause_output2 = pause_output1+['twiddle3']
|
| -
|
| - def testPausing(self):
|
| - """
|
| - Test pause inside data receiving. It uses fake clock to see if
|
| - pausing/resuming work.
|
| - """
|
| - for packet_size in range(1, 10):
|
| - t = StringIOWithoutClosing()
|
| - clock = task.Clock()
|
| - a = LineTester(clock)
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - for i in range(len(self.pause_buf)/packet_size + 1):
|
| - s = self.pause_buf[i*packet_size:(i+1)*packet_size]
|
| - a.dataReceived(s)
|
| - self.failUnlessEqual(self.pause_output1, a.received)
|
| - clock.advance(0)
|
| - self.failUnlessEqual(self.pause_output2, a.received)
|
| -
|
| - rawpause_buf = 'twiddle1\ntwiddle2\nlen 5\nrawpause\n12345twiddle3\n'
|
| -
|
| - rawpause_output1 = ['twiddle1', 'twiddle2', 'len 5', 'rawpause', '']
|
| - rawpause_output2 = ['twiddle1', 'twiddle2', 'len 5', 'rawpause', '12345',
|
| - 'twiddle3']
|
| -
|
| - def testRawPausing(self):
|
| - """
|
| - Test pause inside raw date receiving.
|
| - """
|
| - for packet_size in range(1, 10):
|
| - t = StringIOWithoutClosing()
|
| - clock = task.Clock()
|
| - a = LineTester(clock)
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - for i in range(len(self.rawpause_buf)/packet_size + 1):
|
| - s = self.rawpause_buf[i*packet_size:(i+1)*packet_size]
|
| - a.dataReceived(s)
|
| - self.failUnlessEqual(self.rawpause_output1, a.received)
|
| - clock.advance(0)
|
| - self.failUnlessEqual(self.rawpause_output2, a.received)
|
| -
|
| - stop_buf = 'twiddle1\ntwiddle2\nstop\nmore\nstuff\n'
|
| -
|
| - stop_output = ['twiddle1', 'twiddle2', 'stop']
|
| -
|
| - def testStopProducing(self):
|
| - """
|
| - Test stop inside producing.
|
| - """
|
| - for packet_size in range(1, 10):
|
| - t = StringIOWithoutClosing()
|
| - a = LineTester()
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - for i in range(len(self.stop_buf)/packet_size + 1):
|
| - s = self.stop_buf[i*packet_size:(i+1)*packet_size]
|
| - a.dataReceived(s)
|
| - self.failUnlessEqual(self.stop_output, a.received)
|
| -
|
| -
|
| - def testLineReceiverAsProducer(self):
|
| - """
|
| - Test produce/unproduce in receiving.
|
| - """
|
| - a = LineTester()
|
| - t = StringIOWithoutClosing()
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - a.dataReceived('produce\nhello world\nunproduce\ngoodbye\n')
|
| - self.assertEquals(a.received,
|
| - ['produce', 'hello world', 'unproduce', 'goodbye'])
|
| -
|
| -
|
| -class LineOnlyReceiverTestCase(unittest.TestCase):
|
| - """
|
| - Test line only receiveer.
|
| - """
|
| - buffer = """foo
|
| - bleakness
|
| - desolation
|
| - plastic forks
|
| - """
|
| -
|
| - def testBuffer(self):
|
| - """
|
| - Test buffering over line protocol: data received should match buffer.
|
| - """
|
| - t = StringIOWithoutClosing()
|
| - a = LineOnlyTester()
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - for c in self.buffer:
|
| - a.dataReceived(c)
|
| - self.failUnlessEqual(a.received, self.buffer.split('\n')[:-1])
|
| -
|
| - def testLineTooLong(self):
|
| - """
|
| - Test sending a line too long: it should close the connection.
|
| - """
|
| - t = StringIOWithoutClosing()
|
| - a = LineOnlyTester()
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - res = a.dataReceived('x'*200)
|
| - self.assertTrue(isinstance(res, error.ConnectionLost))
|
| -
|
| -
|
| -
|
| -class TestMixin:
|
| -
|
| - def connectionMade(self):
|
| - self.received = []
|
| -
|
| - def stringReceived(self, s):
|
| - self.received.append(s)
|
| -
|
| - MAX_LENGTH = 50
|
| - closed = 0
|
| -
|
| - def connectionLost(self, reason):
|
| - self.closed = 1
|
| -
|
| -
|
| -class TestNetstring(TestMixin, basic.NetstringReceiver):
|
| - pass
|
| -
|
| -
|
| -class LPTestCaseMixin:
|
| -
|
| - illegalStrings = []
|
| - protocol = None
|
| -
|
| - def getProtocol(self):
|
| - t = StringIOWithoutClosing()
|
| - a = self.protocol()
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - return a
|
| -
|
| - def test_illegal(self):
|
| - """
|
| - Assert that illegal strings cause the transport to be closed.
|
| - """
|
| - for s in self.illegalStrings:
|
| - r = self.getProtocol()
|
| - for c in s:
|
| - r.dataReceived(c)
|
| - self.assertEquals(r.transport.closed, 1)
|
| -
|
| -
|
| -class NetstringReceiverTestCase(unittest.TestCase, LPTestCaseMixin):
|
| -
|
| - strings = ['hello', 'world', 'how', 'are', 'you123', ':today', "a"*515]
|
| -
|
| - illegalStrings = [
|
| - '9999999999999999999999', 'abc', '4:abcde',
|
| - '51:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab,',]
|
| -
|
| - protocol = TestNetstring
|
| -
|
| - def testBuffer(self):
|
| - for packet_size in range(1, 10):
|
| - t = StringIOWithoutClosing()
|
| - a = TestNetstring()
|
| - a.MAX_LENGTH = 699
|
| - a.makeConnection(protocol.FileWrapper(t))
|
| - for s in self.strings:
|
| - a.sendString(s)
|
| - out = t.getvalue()
|
| - for i in range(len(out)/packet_size + 1):
|
| - s = out[i*packet_size:(i+1)*packet_size]
|
| - if s:
|
| - a.dataReceived(s)
|
| - self.assertEquals(a.received, self.strings)
|
| -
|
| -
|
| -class IntNTestCaseMixin(LPTestCaseMixin):
|
| - """
|
| - TestCase mixin for int-prefixed protocols.
|
| - """
|
| -
|
| - protocol = None
|
| - strings = None
|
| - illegalStrings = None
|
| - partialStrings = None
|
| -
|
| - def test_receive(self):
|
| - """
|
| - Test receiving data find the same data send.
|
| - """
|
| - r = self.getProtocol()
|
| - for s in self.strings:
|
| - for c in struct.pack(self.protocol.structFormat,len(s)) + s:
|
| - r.dataReceived(c)
|
| - self.assertEquals(r.received, self.strings)
|
| -
|
| - def test_partial(self):
|
| - """
|
| - Send partial data, nothing should be definitely received.
|
| - """
|
| - for s in self.partialStrings:
|
| - r = self.getProtocol()
|
| - for c in s:
|
| - r.dataReceived(c)
|
| - self.assertEquals(r.received, [])
|
| -
|
| - def test_send(self):
|
| - """
|
| - Test sending data over protocol.
|
| - """
|
| - r = self.getProtocol()
|
| - r.sendString("b" * 16)
|
| - self.assertEquals(r.transport.file.getvalue(),
|
| - struct.pack(self.protocol.structFormat, 16) + "b" * 16)
|
| -
|
| -
|
| -class TestInt32(TestMixin, basic.Int32StringReceiver):
|
| - """
|
| - A L{basic.Int32StringReceiver} storing received strings in an array.
|
| -
|
| - @ivar received: array holding received strings.
|
| - """
|
| -
|
| -
|
| -class Int32TestCase(unittest.TestCase, IntNTestCaseMixin):
|
| - """
|
| - Test case for int32-prefixed protocol
|
| - """
|
| - protocol = TestInt32
|
| - strings = ["a", "b" * 16]
|
| - illegalStrings = ["\x10\x00\x00\x00aaaaaa"]
|
| - partialStrings = ["\x00\x00\x00", "hello there", ""]
|
| -
|
| - def test_data(self):
|
| - """
|
| - Test specific behavior of the 32-bits length.
|
| - """
|
| - r = self.getProtocol()
|
| - r.sendString("foo")
|
| - self.assertEquals(r.transport.file.getvalue(), "\x00\x00\x00\x03foo")
|
| - r.dataReceived("\x00\x00\x00\x04ubar")
|
| - self.assertEquals(r.received, ["ubar"])
|
| -
|
| -
|
| -class TestInt16(TestMixin, basic.Int16StringReceiver):
|
| - """
|
| - A L{basic.Int16StringReceiver} storing received strings in an array.
|
| -
|
| - @ivar received: array holding received strings.
|
| - """
|
| -
|
| -
|
| -class Int16TestCase(unittest.TestCase, IntNTestCaseMixin):
|
| - """
|
| - Test case for int16-prefixed protocol
|
| - """
|
| - protocol = TestInt16
|
| - strings = ["a", "b" * 16]
|
| - illegalStrings = ["\x10\x00aaaaaa"]
|
| - partialStrings = ["\x00", "hello there", ""]
|
| -
|
| - def test_data(self):
|
| - """
|
| - Test specific behavior of the 16-bits length.
|
| - """
|
| - r = self.getProtocol()
|
| - r.sendString("foo")
|
| - self.assertEquals(r.transport.file.getvalue(), "\x00\x03foo")
|
| - r.dataReceived("\x00\x04ubar")
|
| - self.assertEquals(r.received, ["ubar"])
|
| -
|
| - def test_tooLongSend(self):
|
| - """
|
| - Send too much data: that should cause an error.
|
| - """
|
| - r = self.getProtocol()
|
| - tooSend = "b" * (2**(r.prefixLength*8) + 1)
|
| - self.assertRaises(AssertionError, r.sendString, tooSend)
|
| -
|
| -
|
| -class TestInt8(TestMixin, basic.Int8StringReceiver):
|
| - """
|
| - A L{basic.Int8StringReceiver} storing received strings in an array.
|
| -
|
| - @ivar received: array holding received strings.
|
| - """
|
| -
|
| -
|
| -class Int8TestCase(unittest.TestCase, IntNTestCaseMixin):
|
| - """
|
| - Test case for int8-prefixed protocol
|
| - """
|
| - protocol = TestInt8
|
| - strings = ["a", "b" * 16]
|
| - illegalStrings = ["\x00\x00aaaaaa"]
|
| - partialStrings = ["\x08", "dzadz", ""]
|
| -
|
| - def test_data(self):
|
| - """
|
| - Test specific behavior of the 8-bits length.
|
| - """
|
| - r = self.getProtocol()
|
| - r.sendString("foo")
|
| - self.assertEquals(r.transport.file.getvalue(), "\x03foo")
|
| - r.dataReceived("\x04ubar")
|
| - self.assertEquals(r.received, ["ubar"])
|
| -
|
| - def test_tooLongSend(self):
|
| - """
|
| - Send too much data: that should cause an error.
|
| - """
|
| - r = self.getProtocol()
|
| - tooSend = "b" * (2**(r.prefixLength*8) + 1)
|
| - self.assertRaises(AssertionError, r.sendString, tooSend)
|
| -
|
| -
|
| -class OnlyProducerTransport(object):
|
| - # Transport which isn't really a transport, just looks like one to
|
| - # someone not looking very hard.
|
| -
|
| - paused = False
|
| - disconnecting = False
|
| -
|
| - def __init__(self):
|
| - self.data = []
|
| -
|
| - def pauseProducing(self):
|
| - self.paused = True
|
| -
|
| - def resumeProducing(self):
|
| - self.paused = False
|
| -
|
| - def write(self, bytes):
|
| - self.data.append(bytes)
|
| -
|
| -
|
| -class ConsumingProtocol(basic.LineReceiver):
|
| - # Protocol that really, really doesn't want any more bytes.
|
| -
|
| - def lineReceived(self, line):
|
| - self.transport.write(line)
|
| - self.pauseProducing()
|
| -
|
| -
|
| -class ProducerTestCase(unittest.TestCase):
|
| - def testPauseResume(self):
|
| - p = ConsumingProtocol()
|
| - t = OnlyProducerTransport()
|
| - p.makeConnection(t)
|
| -
|
| - p.dataReceived('hello, ')
|
| - self.failIf(t.data)
|
| - self.failIf(t.paused)
|
| - self.failIf(p.paused)
|
| -
|
| - p.dataReceived('world\r\n')
|
| -
|
| - self.assertEquals(t.data, ['hello, world'])
|
| - self.failUnless(t.paused)
|
| - self.failUnless(p.paused)
|
| -
|
| - p.resumeProducing()
|
| -
|
| - self.failIf(t.paused)
|
| - self.failIf(p.paused)
|
| -
|
| - p.dataReceived('hello\r\nworld\r\n')
|
| -
|
| - self.assertEquals(t.data, ['hello, world', 'hello'])
|
| - self.failUnless(t.paused)
|
| - self.failUnless(p.paused)
|
| -
|
| - p.resumeProducing()
|
| - p.dataReceived('goodbye\r\n')
|
| -
|
| - self.assertEquals(t.data, ['hello, world', 'hello', 'world'])
|
| - self.failUnless(t.paused)
|
| - self.failUnless(p.paused)
|
| -
|
| - p.resumeProducing()
|
| -
|
| - self.assertEquals(t.data, ['hello, world', 'hello', 'world', 'goodbye'])
|
| - self.failUnless(t.paused)
|
| - self.failUnless(p.paused)
|
| -
|
| - p.resumeProducing()
|
| -
|
| - self.assertEquals(t.data, ['hello, world', 'hello', 'world', 'goodbye'])
|
| - self.failIf(t.paused)
|
| - self.failIf(p.paused)
|
| -
|
| -
|
| -
|
| -class TestableProxyClientFactory(portforward.ProxyClientFactory):
|
| - """
|
| - Test proxy client factory that keeps the last created protocol instance.
|
| -
|
| - @ivar protoInstance: the last instance of the protocol.
|
| - @type protoInstance: L{portforward.ProxyClient}
|
| - """
|
| -
|
| - def buildProtocol(self, addr):
|
| - """
|
| - Create the protocol instance and keeps track of it.
|
| - """
|
| - proto = portforward.ProxyClientFactory.buildProtocol(self, addr)
|
| - self.protoInstance = proto
|
| - return proto
|
| -
|
| -
|
| -
|
| -class TestableProxyFactory(portforward.ProxyFactory):
|
| - """
|
| - Test proxy factory that keeps the last created protocol instance.
|
| -
|
| - @ivar protoInstance: the last instance of the protocol.
|
| - @type protoInstance: L{portforward.ProxyServer}
|
| -
|
| - @ivar clientFactoryInstance: client factory used by C{protoInstance} to
|
| - create forward connections.
|
| - @type clientFactoryInstance: L{TestableProxyClientFactory}
|
| - """
|
| -
|
| - def buildProtocol(self, addr):
|
| - """
|
| - Create the protocol instance, keeps track of it, and makes it use
|
| - C{clientFactoryInstance} as client factory.
|
| - """
|
| - proto = portforward.ProxyFactory.buildProtocol(self, addr)
|
| - self.clientFactoryInstance = TestableProxyClientFactory()
|
| - # Force the use of this specific instance
|
| - proto.clientProtocolFactory = lambda: self.clientFactoryInstance
|
| - self.protoInstance = proto
|
| - return proto
|
| -
|
| -
|
| -
|
| -class Portforwarding(unittest.TestCase):
|
| - """
|
| - Test port forwarding.
|
| - """
|
| -
|
| - def setUp(self):
|
| - self.serverProtocol = wire.Echo()
|
| - self.clientProtocol = protocol.Protocol()
|
| - self.openPorts = []
|
| -
|
| -
|
| - def tearDown(self):
|
| - try:
|
| - self.proxyServerFactory.protoInstance.transport.loseConnection()
|
| - except AttributeError:
|
| - pass
|
| - try:
|
| - self.proxyServerFactory.clientFactoryInstance.protoInstance.transport.loseConnection()
|
| - except AttributeError:
|
| - pass
|
| - try:
|
| - self.clientProtocol.transport.loseConnection()
|
| - except AttributeError:
|
| - pass
|
| - try:
|
| - self.serverProtocol.transport.loseConnection()
|
| - except AttributeError:
|
| - pass
|
| - return defer.gatherResults(
|
| - [defer.maybeDeferred(p.stopListening) for p in self.openPorts])
|
| -
|
| -
|
| - def test_portforward(self):
|
| - """
|
| - Test port forwarding through Echo protocol.
|
| - """
|
| - realServerFactory = protocol.ServerFactory()
|
| - realServerFactory.protocol = lambda: self.serverProtocol
|
| - realServerPort = reactor.listenTCP(0, realServerFactory,
|
| - interface='127.0.0.1')
|
| - self.openPorts.append(realServerPort)
|
| - self.proxyServerFactory = TestableProxyFactory('127.0.0.1',
|
| - realServerPort.getHost().port)
|
| - proxyServerPort = reactor.listenTCP(0, self.proxyServerFactory,
|
| - interface='127.0.0.1')
|
| - self.openPorts.append(proxyServerPort)
|
| -
|
| - nBytes = 1000
|
| - received = []
|
| - d = defer.Deferred()
|
| - def testDataReceived(data):
|
| - received.extend(data)
|
| - if len(received) >= nBytes:
|
| - self.assertEquals(''.join(received), 'x' * nBytes)
|
| - d.callback(None)
|
| - self.clientProtocol.dataReceived = testDataReceived
|
| -
|
| - def testConnectionMade():
|
| - self.clientProtocol.transport.write('x' * nBytes)
|
| - self.clientProtocol.connectionMade = testConnectionMade
|
| -
|
| - clientFactory = protocol.ClientFactory()
|
| - clientFactory.protocol = lambda: self.clientProtocol
|
| -
|
| - reactor.connectTCP(
|
| - '127.0.0.1', proxyServerPort.getHost().port, clientFactory)
|
| -
|
| - return d
|
| -
|
| -
|
| -
|
| -class StringTransportTestCase(unittest.TestCase):
|
| - """
|
| - Test L{proto_helpers.StringTransport} helper behaviour.
|
| - """
|
| -
|
| - def test_noUnicode(self):
|
| - """
|
| - Test that L{proto_helpers.StringTransport} doesn't accept unicode data.
|
| - """
|
| - s = proto_helpers.StringTransport()
|
| - self.assertRaises(TypeError, s.write, u'foo')
|
|
|