Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(197)

Unified Diff: third_party/twisted_8_1/twisted/test/test_amp.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/test/test_amp.py
diff --git a/third_party/twisted_8_1/twisted/test/test_amp.py b/third_party/twisted_8_1/twisted/test/test_amp.py
deleted file mode 100644
index 50271220065f055c52ea6781bc1ff6ea4bd1e73c..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/test/test_amp.py
+++ /dev/null
@@ -1,2115 +0,0 @@
-# Copyright (c) 2005 Divmod, Inc.
-# Copyright (c) 2007 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-from twisted.python import filepath
-from twisted.python.failure import Failure
-from twisted.protocols import amp
-from twisted.test import iosim
-from twisted.trial import unittest
-from twisted.internet import protocol, defer, error, reactor, interfaces
-
-
-class TestProto(protocol.Protocol):
- def __init__(self, onConnLost, dataToSend):
- self.onConnLost = onConnLost
- self.dataToSend = dataToSend
-
- def connectionMade(self):
- self.data = []
- self.transport.write(self.dataToSend)
-
- def dataReceived(self, bytes):
- self.data.append(bytes)
- # self.transport.loseConnection()
-
- def connectionLost(self, reason):
- self.onConnLost.callback(self.data)
-
-
-
-class SimpleSymmetricProtocol(amp.AMP):
-
- def sendHello(self, text):
- return self.callRemoteString(
- "hello",
- hello=text)
-
- def amp_HELLO(self, box):
- return amp.Box(hello=box['hello'])
-
- def amp_HOWDOYOUDO(self, box):
- return amp.QuitBox(howdoyoudo='world')
-
-
-
-class UnfriendlyGreeting(Exception):
- """Greeting was insufficiently kind.
- """
-
-class DeathThreat(Exception):
- """Greeting was insufficiently kind.
- """
-
-class UnknownProtocol(Exception):
- """Asked to switch to the wrong protocol.
- """
-
-
-class TransportPeer(amp.Argument):
- # this serves as some informal documentation for how to get variables from
- # the protocol or your environment and pass them to methods as arguments.
- def retrieve(self, d, name, proto):
- return ''
-
- def fromStringProto(self, notAString, proto):
- return proto.transport.getPeer()
-
- def toBox(self, name, strings, objects, proto):
- return
-
-
-
-class Hello(amp.Command):
-
- commandName = 'hello'
-
- arguments = [('hello', amp.String()),
- ('optional', amp.Boolean(optional=True)),
- ('print', amp.Unicode(optional=True)),
- ('from', TransportPeer(optional=True)),
- ('mixedCase', amp.String(optional=True)),
- ('dash-arg', amp.String(optional=True)),
- ('underscore_arg', amp.String(optional=True))]
-
- response = [('hello', amp.String()),
- ('print', amp.Unicode(optional=True))]
-
- errors = {UnfriendlyGreeting: 'UNFRIENDLY'}
-
- fatalErrors = {DeathThreat: 'DEAD'}
-
-class NoAnswerHello(Hello):
- commandName = Hello.commandName
- requiresAnswer = False
-
-class FutureHello(amp.Command):
- commandName = 'hello'
-
- arguments = [('hello', amp.String()),
- ('optional', amp.Boolean(optional=True)),
- ('print', amp.Unicode(optional=True)),
- ('from', TransportPeer(optional=True)),
- ('bonus', amp.String(optional=True)), # addt'l arguments
- # should generally be
- # added at the end, and
- # be optional...
- ]
-
- response = [('hello', amp.String()),
- ('print', amp.Unicode(optional=True))]
-
- errors = {UnfriendlyGreeting: 'UNFRIENDLY'}
-
-class WTF(amp.Command):
- """
- An example of an invalid command.
- """
-
-
-class BrokenReturn(amp.Command):
- """ An example of a perfectly good command, but the handler is going to return
- None...
- """
-
- commandName = 'broken_return'
-
-class Goodbye(amp.Command):
- # commandName left blank on purpose: this tests implicit command names.
- response = [('goodbye', amp.String())]
- responseType = amp.QuitBox
-
-class Howdoyoudo(amp.Command):
- commandName = 'howdoyoudo'
- # responseType = amp.QuitBox
-
-class WaitForever(amp.Command):
- commandName = 'wait_forever'
-
-class GetList(amp.Command):
- commandName = 'getlist'
- arguments = [('length', amp.Integer())]
- response = [('body', amp.AmpList([('x', amp.Integer())]))]
-
-class SecuredPing(amp.Command):
- # XXX TODO: actually make this refuse to send over an insecure connection
- response = [('pinged', amp.Boolean())]
-
-class TestSwitchProto(amp.ProtocolSwitchCommand):
- commandName = 'Switch-Proto'
-
- arguments = [
- ('name', amp.String()),
- ]
- errors = {UnknownProtocol: 'UNKNOWN'}
-
-class SingleUseFactory(protocol.ClientFactory):
- def __init__(self, proto):
- self.proto = proto
- self.proto.factory = self
-
- def buildProtocol(self, addr):
- p, self.proto = self.proto, None
- return p
-
- reasonFailed = None
-
- def clientConnectionFailed(self, connector, reason):
- self.reasonFailed = reason
- return
-
-THING_I_DONT_UNDERSTAND = 'gwebol nargo'
-class ThingIDontUnderstandError(Exception):
- pass
-
-class FactoryNotifier(amp.AMP):
- factory = None
- def connectionMade(self):
- if self.factory is not None:
- self.factory.theProto = self
- if hasattr(self.factory, 'onMade'):
- self.factory.onMade.callback(None)
-
- def emitpong(self):
- from twisted.internet.interfaces import ISSLTransport
- if not ISSLTransport.providedBy(self.transport):
- raise DeathThreat("only send secure pings over secure channels")
- return {'pinged': True}
- SecuredPing.responder(emitpong)
-
-
-class SimpleSymmetricCommandProtocol(FactoryNotifier):
- maybeLater = None
- def __init__(self, onConnLost=None):
- amp.AMP.__init__(self)
- self.onConnLost = onConnLost
-
- def sendHello(self, text):
- return self.callRemote(Hello, hello=text)
-
- def sendUnicodeHello(self, text, translation):
- return self.callRemote(Hello, hello=text, Print=translation)
-
- greeted = False
-
- def cmdHello(self, hello, From, optional=None, Print=None,
- mixedCase=None, dash_arg=None, underscore_arg=None):
- assert From == self.transport.getPeer()
- if hello == THING_I_DONT_UNDERSTAND:
- raise ThingIDontUnderstandError()
- if hello.startswith('fuck'):
- raise UnfriendlyGreeting("Don't be a dick.")
- if hello == 'die':
- raise DeathThreat("aieeeeeeeee")
- result = dict(hello=hello)
- if Print is not None:
- result.update(dict(Print=Print))
- self.greeted = True
- return result
- Hello.responder(cmdHello)
-
- def cmdGetlist(self, length):
- return {'body': [dict(x=1)] * length}
- GetList.responder(cmdGetlist)
-
- def waitforit(self):
- self.waiting = defer.Deferred()
- return self.waiting
- WaitForever.responder(waitforit)
-
- def howdo(self):
- return dict(howdoyoudo='world')
- Howdoyoudo.responder(howdo)
-
- def saybye(self):
- return dict(goodbye="everyone")
- Goodbye.responder(saybye)
-
- def switchToTestProtocol(self, fail=False):
- if fail:
- name = 'no-proto'
- else:
- name = 'test-proto'
- p = TestProto(self.onConnLost, SWITCH_CLIENT_DATA)
- return self.callRemote(
- TestSwitchProto,
- SingleUseFactory(p), name=name).addCallback(lambda ign: p)
-
- def switchit(self, name):
- if name == 'test-proto':
- return TestProto(self.onConnLost, SWITCH_SERVER_DATA)
- raise UnknownProtocol(name)
- TestSwitchProto.responder(switchit)
-
- def donothing(self):
- return None
- BrokenReturn.responder(donothing)
-
-
-class DeferredSymmetricCommandProtocol(SimpleSymmetricCommandProtocol):
- def switchit(self, name):
- if name == 'test-proto':
- self.maybeLaterProto = TestProto(self.onConnLost, SWITCH_SERVER_DATA)
- self.maybeLater = defer.Deferred()
- return self.maybeLater
- raise UnknownProtocol(name)
- TestSwitchProto.responder(switchit)
-
-class BadNoAnswerCommandProtocol(SimpleSymmetricCommandProtocol):
- def badResponder(self, hello, From, optional=None, Print=None,
- mixedCase=None, dash_arg=None, underscore_arg=None):
- """
- This responder does nothing and forgets to return a dictionary.
- """
- NoAnswerHello.responder(badResponder)
-
-class NoAnswerCommandProtocol(SimpleSymmetricCommandProtocol):
- def goodNoAnswerResponder(self, hello, From, optional=None, Print=None,
- mixedCase=None, dash_arg=None, underscore_arg=None):
- return dict(hello=hello+"-noanswer")
- NoAnswerHello.responder(goodNoAnswerResponder)
-
-def connectedServerAndClient(ServerClass=SimpleSymmetricProtocol,
- ClientClass=SimpleSymmetricProtocol,
- *a, **kw):
- """Returns a 3-tuple: (client, server, pump)
- """
- return iosim.connectedServerAndClient(
- ServerClass, ClientClass,
- *a, **kw)
-
-class TotallyDumbProtocol(protocol.Protocol):
- buf = ''
- def dataReceived(self, data):
- self.buf += data
-
-class LiteralAmp(amp.AMP):
- def __init__(self):
- self.boxes = []
-
- def ampBoxReceived(self, box):
- self.boxes.append(box)
- return
-
-class ParsingTest(unittest.TestCase):
-
- def test_booleanValues(self):
- """
- Verify that the Boolean parser parses 'True' and 'False', but nothing
- else.
- """
- b = amp.Boolean()
- self.assertEquals(b.fromString("True"), True)
- self.assertEquals(b.fromString("False"), False)
- self.assertRaises(TypeError, b.fromString, "ninja")
- self.assertRaises(TypeError, b.fromString, "true")
- self.assertRaises(TypeError, b.fromString, "TRUE")
- self.assertEquals(b.toString(True), 'True')
- self.assertEquals(b.toString(False), 'False')
-
- def test_pathValueRoundTrip(self):
- """
- Verify the 'Path' argument can parse and emit a file path.
- """
- fp = filepath.FilePath(self.mktemp())
- p = amp.Path()
- s = p.toString(fp)
- v = p.fromString(s)
- self.assertNotIdentical(fp, v) # sanity check
- self.assertEquals(fp, v)
-
-
- def test_sillyEmptyThing(self):
- """
- Test that empty boxes raise an error; they aren't supposed to be sent
- on purpose.
- """
- a = amp.AMP()
- return self.assertRaises(amp.NoEmptyBoxes, a.ampBoxReceived, amp.Box())
-
-
- def test_ParsingRoundTrip(self):
- """
- Verify that various kinds of data make it through the encode/parse
- round-trip unharmed.
- """
- c, s, p = connectedServerAndClient(ClientClass=LiteralAmp,
- ServerClass=LiteralAmp)
-
- SIMPLE = ('simple', 'test')
- CE = ('ceq', ': ')
- CR = ('crtest', 'test\r')
- LF = ('lftest', 'hello\n')
- NEWLINE = ('newline', 'test\r\none\r\ntwo')
- NEWLINE2 = ('newline2', 'test\r\none\r\n two')
- BLANKLINE = ('newline3', 'test\r\n\r\nblank\r\n\r\nline')
- BODYTEST = ('body', 'blah\r\n\r\ntesttest')
-
- testData = [
- [SIMPLE],
- [SIMPLE, BODYTEST],
- [SIMPLE, CE],
- [SIMPLE, CR],
- [SIMPLE, CE, CR, LF],
- [CE, CR, LF],
- [SIMPLE, NEWLINE, CE, NEWLINE2],
- [BODYTEST, SIMPLE, NEWLINE]
- ]
-
- for test in testData:
- jb = amp.Box()
- jb.update(dict(test))
- jb._sendTo(c)
- p.flush()
- self.assertEquals(s.boxes[-1], jb)
-
-
-
-class FakeLocator(object):
- """
- This is a fake implementation of the interface implied by
- L{CommandLocator}.
- """
- def __init__(self):
- """
- Remember the given keyword arguments as a set of responders.
- """
- self.commands = {}
-
-
- def locateResponder(self, commandName):
- """
- Look up and return a function passed as a keyword argument of the given
- name to the constructor.
- """
- return self.commands[commandName]
-
-
-class FakeSender:
- """
- This is a fake implementation of the 'box sender' interface implied by
- L{AMP}.
- """
- def __init__(self):
- """
- Create a fake sender and initialize the list of received boxes and
- unhandled errors.
- """
- self.sentBoxes = []
- self.unhandledErrors = []
- self.expectedErrors = 0
-
-
- def expectError(self):
- """
- Expect one error, so that the test doesn't fail.
- """
- self.expectedErrors += 1
-
-
- def sendBox(self, box):
- """
- Accept a box, but don't do anything.
- """
- self.sentBoxes.append(box)
-
-
- def unhandledError(self, failure):
- """
- Deal with failures by instantly re-raising them for easier debugging.
- """
- self.expectedErrors -= 1
- if self.expectedErrors < 0:
- failure.raiseException()
- else:
- self.unhandledErrors.append(failure)
-
-
-
-class CommandDispatchTests(unittest.TestCase):
- """
- The AMP CommandDispatcher class dispatches converts AMP boxes into commands
- and responses using Command.responder decorator.
-
- Note: Originally, AMP's factoring was such that many tests for this
- functionality are now implemented as full round-trip tests in L{AMPTest}.
- Future tests should be written at this level instead, to ensure API
- compatibility and to provide more granular, readable units of test
- coverage.
- """
-
- def setUp(self):
- """
- Create a dispatcher to use.
- """
- self.locator = FakeLocator()
- self.sender = FakeSender()
- self.dispatcher = amp.BoxDispatcher(self.locator)
- self.dispatcher.startReceivingBoxes(self.sender)
-
-
- def test_receivedAsk(self):
- """
- L{CommandDispatcher.ampBoxReceived} should locate the appropriate
- command in its responder lookup, based on the '_ask' key.
- """
- received = []
- def thunk(box):
- received.append(box)
- return amp.Box({"hello": "goodbye"})
- input = amp.Box(_command="hello",
- _ask="test-command-id",
- hello="world")
- self.locator.commands['hello'] = thunk
- self.dispatcher.ampBoxReceived(input)
- self.assertEquals(received, [input])
-
-
- def test_sendUnhandledError(self):
- """
- L{CommandDispatcher} should relay its unhandled errors in responding to
- boxes to its boxSender.
- """
- err = RuntimeError("something went wrong, oh no")
- self.sender.expectError()
- self.dispatcher.unhandledError(Failure(err))
- self.assertEqual(len(self.sender.unhandledErrors), 1)
- self.assertEqual(self.sender.unhandledErrors[0].value, err)
-
-
- def test_unhandledSerializationError(self):
- """
- Errors during serialization ought to be relayed to the sender's
- unhandledError method.
- """
- err = RuntimeError("something undefined went wrong")
- def thunk(result):
- class BrokenBox(amp.Box):
- def _sendTo(self, proto):
- raise err
- return BrokenBox()
- self.locator.commands['hello'] = thunk
- input = amp.Box(_command="hello",
- _ask="test-command-id",
- hello="world")
- self.sender.expectError()
- self.dispatcher.ampBoxReceived(input)
- self.assertEquals(len(self.sender.unhandledErrors), 1)
- self.assertEquals(self.sender.unhandledErrors[0].value, err)
-
-
- def test_callRemote(self):
- """
- L{CommandDispatcher.callRemote} should emit a properly formatted '_ask'
- box to its boxSender and record an outstanding L{Deferred}. When a
- corresponding '_answer' packet is received, the L{Deferred} should be
- fired, and the results translated via the given L{Command}'s response
- de-serialization.
- """
- D = self.dispatcher.callRemote(Hello, hello='world')
- self.assertEquals(self.sender.sentBoxes,
- [amp.AmpBox(_command="hello",
- _ask="1",
- hello="world")])
- answers = []
- D.addCallback(answers.append)
- self.assertEquals(answers, [])
- self.dispatcher.ampBoxReceived(amp.AmpBox({'hello': "yay",
- 'print': "ignored",
- '_answer': "1"}))
- self.assertEquals(answers, [dict(hello="yay",
- Print=u"ignored")])
-
-
-class SimpleGreeting(amp.Command):
- """
- A very simple greeting command that uses a few basic argument types.
- """
- commandName = 'simple'
- arguments = [('greeting', amp.Unicode()),
- ('cookie', amp.Integer())]
- response = [('cookieplus', amp.Integer())]
-
-
-class TestLocator(amp.CommandLocator):
- """
- A locator which implements a responder to a 'hello' command.
- """
- def __init__(self):
- self.greetings = []
-
-
- def greetingResponder(self, greeting, cookie):
- self.greetings.append((greeting, cookie))
- return dict(cookieplus=cookie + 3)
- greetingResponder = SimpleGreeting.responder(greetingResponder)
-
-
-
-class OverrideLocatorAMP(amp.AMP):
- def __init__(self):
- amp.AMP.__init__(self)
- self.customResponder = object()
- self.expectations = {"custom": self.customResponder}
- self.greetings = []
-
-
- def lookupFunction(self, name):
- """
- Override the deprecated lookupFunction function.
- """
- if name in self.expectations:
- result = self.expectations[name]
- return result
- else:
- return super(OverrideLocatorAMP, self).lookupFunction(name)
-
-
- def greetingResponder(self, greeting, cookie):
- self.greetings.append((greeting, cookie))
- return dict(cookieplus=cookie + 3)
- greetingResponder = SimpleGreeting.responder(greetingResponder)
-
-
-
-
-class CommandLocatorTests(unittest.TestCase):
- """
- The CommandLocator should enable users to specify responders to commands as
- functions that take structured objects, annotated with metadata.
- """
-
- def test_responderDecorator(self):
- """
- A method on a L{CommandLocator} subclass decorated with a L{Command}
- subclass's L{responder} decorator should be returned from
- locateResponder, wrapped in logic to serialize and deserialize its
- arguments.
- """
- locator = TestLocator()
- responderCallable = locator.locateResponder("simple")
- result = responderCallable(amp.Box(greeting="ni hao", cookie="5"))
- def done(values):
- self.assertEquals(values, amp.AmpBox(cookieplus='8'))
- return result.addCallback(done)
-
-
- def test_lookupFunctionDeprecatedOverride(self):
- """
- Subclasses which override locateResponder under its old name,
- lookupFunction, should have the override invoked instead. (This tests
- an AMP subclass, because in the version of the code that could invoke
- this deprecated code path, there was no L{CommandLocator}.)
- """
- locator = OverrideLocatorAMP()
- customResponderObject = self.assertWarns(
- PendingDeprecationWarning,
- "Override locateResponder, not lookupFunction.",
- __file__, lambda : locator.locateResponder("custom"))
- self.assertEquals(locator.customResponder, customResponderObject)
- # Make sure upcalling works too
- normalResponderObject = self.assertWarns(
- PendingDeprecationWarning,
- "Override locateResponder, not lookupFunction.",
- __file__, lambda : locator.locateResponder("simple"))
- result = normalResponderObject(amp.Box(greeting="ni hao", cookie="5"))
- def done(values):
- self.assertEquals(values, amp.AmpBox(cookieplus='8'))
- return result.addCallback(done)
-
-
- def test_lookupFunctionDeprecatedInvoke(self):
- """
- Invoking locateResponder under its old name, lookupFunction, should
- emit a deprecation warning, but do the same thing.
- """
- locator = TestLocator()
- responderCallable = self.assertWarns(
- PendingDeprecationWarning,
- "Call locateResponder, not lookupFunction.", __file__,
- lambda : locator.lookupFunction("simple"))
- result = responderCallable(amp.Box(greeting="ni hao", cookie="5"))
- def done(values):
- self.assertEquals(values, amp.AmpBox(cookieplus='8'))
- return result.addCallback(done)
-
-
-
-SWITCH_CLIENT_DATA = 'Success!'
-SWITCH_SERVER_DATA = 'No, really. Success.'
-
-
-class BinaryProtocolTests(unittest.TestCase):
- """
- Tests for L{amp.BinaryBoxProtocol}.
- """
-
- def setUp(self):
- """
- Keep track of all boxes received by this test in its capacity as an
- L{IBoxReceiver} implementor.
- """
- self.boxes = []
- self.data = []
-
-
- def startReceivingBoxes(self, sender):
- """
- Implement L{IBoxReceiver.startReceivingBoxes} to do nothing.
- """
-
-
- def ampBoxReceived(self, box):
- """
- A box was received by the protocol.
- """
- self.boxes.append(box)
-
- stopReason = None
- def stopReceivingBoxes(self, reason):
- """
- Record the reason that we stopped receiving boxes.
- """
- self.stopReason = reason
-
-
- # fake ITransport
- def getPeer(self):
- return 'no peer'
-
-
- def getHost(self):
- return 'no host'
-
-
- def write(self, data):
- self.data.append(data)
-
-
- def test_receiveBoxStateMachine(self):
- """
- When a binary box protocol receives:
- * a key
- * a value
- * an empty string
- it should emit a box and send it to its boxReceiver.
- """
- a = amp.BinaryBoxProtocol(self)
- a.stringReceived("hello")
- a.stringReceived("world")
- a.stringReceived("")
- self.assertEquals(self.boxes, [amp.AmpBox(hello="world")])
-
-
- def test_receiveBoxData(self):
- """
- When a binary box protocol receives the serialized form of an AMP box,
- it should emit a similar box to its boxReceiver.
- """
- a = amp.BinaryBoxProtocol(self)
- a.dataReceived(amp.Box({"testKey": "valueTest",
- "anotherKey": "anotherValue"}).serialize())
- self.assertEquals(self.boxes,
- [amp.Box({"testKey": "valueTest",
- "anotherKey": "anotherValue"})])
-
-
- def test_sendBox(self):
- """
- When a binary box protocol sends a box, it should emit the serialized
- bytes of that box to its transport.
- """
- a = amp.BinaryBoxProtocol(self)
- a.makeConnection(self)
- aBox = amp.Box({"testKey": "valueTest",
- "someData": "hello"})
- a.makeConnection(self)
- a.sendBox(aBox)
- self.assertEquals(''.join(self.data), aBox.serialize())
-
-
- def test_connectionLostStopSendingBoxes(self):
- """
- When a binary box protocol loses its connection, it should notify its
- box receiver that it has stopped receiving boxes.
- """
- a = amp.BinaryBoxProtocol(self)
- a.makeConnection(self)
- aBox = amp.Box({"sample": "data"})
- a.makeConnection(self)
- connectionFailure = Failure(RuntimeError())
- a.connectionLost(connectionFailure)
- self.assertIdentical(self.stopReason, connectionFailure)
-
-
- def test_protocolSwitch(self):
- """
- L{BinaryBoxProtocol} has the capacity to switch to a different protocol
- on a box boundary. When a protocol is in the process of switching, it
- cannot receive traffic.
- """
- otherProto = TestProto(None, "outgoing data")
- test = self
- class SwitchyReceiver:
- switched = False
- def startReceivingBoxes(self, sender):
- pass
- def ampBoxReceived(self, box):
- test.assertFalse(self.switched,
- "Should only receive one box!")
- self.switched = True
- a._lockForSwitch()
- a._switchTo(otherProto)
- a = amp.BinaryBoxProtocol(SwitchyReceiver())
- anyOldBox = amp.Box({"include": "lots",
- "of": "data"})
- a.makeConnection(self)
- # Include a 0-length box at the beginning of the next protocol's data,
- # to make sure that AMP doesn't eat the data or try to deliver extra
- # boxes either...
- moreThanOneBox = anyOldBox.serialize() + "\x00\x00Hello, world!"
- a.dataReceived(moreThanOneBox)
- self.assertIdentical(otherProto.transport, self)
- self.assertEquals("".join(otherProto.data), "\x00\x00Hello, world!")
- self.assertEquals(self.data, ["outgoing data"])
- a.dataReceived("more data")
- self.assertEquals("".join(otherProto.data),
- "\x00\x00Hello, world!more data")
- self.assertRaises(amp.ProtocolSwitched, a.sendBox, anyOldBox)
-
-
- def test_protocolSwitchInvalidStates(self):
- """
- In order to make sure the protocol never gets any invalid data sent
- into the middle of a box, it must be locked for switching before it is
- switched. It can only be unlocked if the switch failed, and attempting
- to send a box while it is locked should raise an exception.
- """
- a = amp.BinaryBoxProtocol(self)
- a.makeConnection(self)
- sampleBox = amp.Box({"some": "data"})
- a._lockForSwitch()
- self.assertRaises(amp.ProtocolSwitched, a.sendBox, sampleBox)
- a._unlockFromSwitch()
- a.sendBox(sampleBox)
- self.assertEquals(''.join(self.data), sampleBox.serialize())
- a._lockForSwitch()
- otherProto = TestProto(None, "outgoing data")
- a._switchTo(otherProto)
- self.assertRaises(amp.ProtocolSwitched, a._unlockFromSwitch)
-
-
- def test_protocolSwitchLoseConnection(self):
- """
- When the protocol is switched, it should notify its nested protocol of
- disconnection.
- """
- class Loser(protocol.Protocol):
- reason = None
- def connectionLost(self, reason):
- self.reason = reason
- connectionLoser = Loser()
- a = amp.BinaryBoxProtocol(self)
- a.makeConnection(self)
- a._lockForSwitch()
- a._switchTo(connectionLoser)
- connectionFailure = Failure(RuntimeError())
- a.connectionLost(connectionFailure)
- self.assertEquals(connectionLoser.reason, connectionFailure)
-
-
- def test_protocolSwitchLoseClientConnection(self):
- """
- When the protocol is switched, it should notify its nested client
- protocol factory of disconnection.
- """
- class ClientLoser:
- reason = None
- def clientConnectionLost(self, connector, reason):
- self.reason = reason
- a = amp.BinaryBoxProtocol(self)
- connectionLoser = protocol.Protocol()
- clientLoser = ClientLoser()
- a.makeConnection(self)
- a._lockForSwitch()
- a._switchTo(connectionLoser, clientLoser)
- connectionFailure = Failure(RuntimeError())
- a.connectionLost(connectionFailure)
- self.assertEquals(clientLoser.reason, connectionFailure)
-
-
-
-class AMPTest(unittest.TestCase):
-
- def test_interfaceDeclarations(self):
- """
- The classes in the amp module ought to implement the interfaces that
- are declared for their benefit.
- """
- for interface, implementation in [(amp.IBoxSender, amp.BinaryBoxProtocol),
- (amp.IBoxReceiver, amp.BoxDispatcher),
- (amp.IResponderLocator, amp.CommandLocator),
- (amp.IResponderLocator, amp.SimpleStringLocator),
- (amp.IBoxSender, amp.AMP),
- (amp.IBoxReceiver, amp.AMP),
- (amp.IResponderLocator, amp.AMP)]:
- self.failUnless(interface.implementedBy(implementation),
- "%s does not implements(%s)" % (implementation, interface))
-
-
- def test_helloWorld(self):
- """
- Verify that a simple command can be sent and its response received with
- the simple low-level string-based API.
- """
- c, s, p = connectedServerAndClient()
- L = []
- HELLO = 'world'
- c.sendHello(HELLO).addCallback(L.append)
- p.flush()
- self.assertEquals(L[0]['hello'], HELLO)
-
-
- def test_wireFormatRoundTrip(self):
- """
- Verify that mixed-case, underscored and dashed arguments are mapped to
- their python names properly.
- """
- c, s, p = connectedServerAndClient()
- L = []
- HELLO = 'world'
- c.sendHello(HELLO).addCallback(L.append)
- p.flush()
- self.assertEquals(L[0]['hello'], HELLO)
-
-
- def test_helloWorldUnicode(self):
- """
- Verify that unicode arguments can be encoded and decoded.
- """
- c, s, p = connectedServerAndClient(
- ServerClass=SimpleSymmetricCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
- L = []
- HELLO = 'world'
- HELLO_UNICODE = 'wor\u1234ld'
- c.sendUnicodeHello(HELLO, HELLO_UNICODE).addCallback(L.append)
- p.flush()
- self.assertEquals(L[0]['hello'], HELLO)
- self.assertEquals(L[0]['Print'], HELLO_UNICODE)
-
-
- def test_unknownCommandLow(self):
- """
- Verify that unknown commands using low-level APIs will be rejected with an
- error, but will NOT terminate the connection.
- """
- c, s, p = connectedServerAndClient()
- L = []
- def clearAndAdd(e):
- """
- You can't propagate the error...
- """
- e.trap(amp.UnhandledCommand)
- return "OK"
- c.callRemoteString("WTF").addErrback(clearAndAdd).addCallback(L.append)
- p.flush()
- self.assertEquals(L.pop(), "OK")
- HELLO = 'world'
- c.sendHello(HELLO).addCallback(L.append)
- p.flush()
- self.assertEquals(L[0]['hello'], HELLO)
-
-
- def test_unknownCommandHigh(self):
- """
- Verify that unknown commands using high-level APIs will be rejected with an
- error, but will NOT terminate the connection.
- """
- c, s, p = connectedServerAndClient()
- L = []
- def clearAndAdd(e):
- """
- You can't propagate the error...
- """
- e.trap(amp.UnhandledCommand)
- return "OK"
- c.callRemote(WTF).addErrback(clearAndAdd).addCallback(L.append)
- p.flush()
- self.assertEquals(L.pop(), "OK")
- HELLO = 'world'
- c.sendHello(HELLO).addCallback(L.append)
- p.flush()
- self.assertEquals(L[0]['hello'], HELLO)
-
-
- def test_brokenReturnValue(self):
- """
- It can be very confusing if you write some code which responds to a
- command, but gets the return value wrong. Most commonly you end up
- returning None instead of a dictionary.
-
- Verify that if that happens, the framework logs a useful error.
- """
- L = []
- SimpleSymmetricCommandProtocol().dispatchCommand(
- amp.AmpBox(_command=BrokenReturn.commandName)).addErrback(L.append)
- blr = L[0].trap(amp.BadLocalReturn)
- self.failUnlessIn('None', repr(L[0].value))
-
-
-
- def test_unknownArgument(self):
- """
- Verify that unknown arguments are ignored, and not passed to a Python
- function which can't accept them.
- """
- c, s, p = connectedServerAndClient(
- ServerClass=SimpleSymmetricCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
- L = []
- HELLO = 'world'
- # c.sendHello(HELLO).addCallback(L.append)
- c.callRemote(FutureHello,
- hello=HELLO,
- bonus="I'm not in the book!").addCallback(
- L.append)
- p.flush()
- self.assertEquals(L[0]['hello'], HELLO)
-
-
- def test_simpleReprs(self):
- """
- Verify that the various Box objects repr properly, for debugging.
- """
- self.assertEquals(type(repr(amp._TLSBox())), str)
- self.assertEquals(type(repr(amp._SwitchBox('a'))), str)
- self.assertEquals(type(repr(amp.QuitBox())), str)
- self.assertEquals(type(repr(amp.AmpBox())), str)
- self.failUnless("AmpBox" in repr(amp.AmpBox()))
-
- def test_keyTooLong(self):
- """
- Verify that a key that is too long will immediately raise a synchronous
- exception.
- """
- c, s, p = connectedServerAndClient()
- L = []
- x = "H" * (0xff+1)
- tl = self.assertRaises(amp.TooLong,
- c.callRemoteString, "Hello",
- **{x: "hi"})
- self.failUnless(tl.isKey)
- self.failUnless(tl.isLocal)
- self.failUnlessIdentical(tl.keyName, None)
- self.failUnlessIdentical(tl.value, x)
- self.failUnless(str(len(x)) in repr(tl))
- self.failUnless("key" in repr(tl))
-
-
- def test_valueTooLong(self):
- """
- Verify that attempting to send value longer than 64k will immediately
- raise an exception.
- """
- c, s, p = connectedServerAndClient()
- L = []
- x = "H" * (0xffff+1)
- tl = self.assertRaises(amp.TooLong, c.sendHello, x)
- p.flush()
- self.failIf(tl.isKey)
- self.failUnless(tl.isLocal)
- self.failUnlessIdentical(tl.keyName, 'hello')
- self.failUnlessIdentical(tl.value, x)
- self.failUnless(str(len(x)) in repr(tl))
- self.failUnless("value" in repr(tl))
- self.failUnless('hello' in repr(tl))
-
-
- def test_helloWorldCommand(self):
- """
- Verify that a simple command can be sent and its response received with
- the high-level value parsing API.
- """
- c, s, p = connectedServerAndClient(
- ServerClass=SimpleSymmetricCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
- L = []
- HELLO = 'world'
- c.sendHello(HELLO).addCallback(L.append)
- p.flush()
- self.assertEquals(L[0]['hello'], HELLO)
-
-
- def test_helloErrorHandling(self):
- """
- Verify that if a known error type is raised and handled, it will be
- properly relayed to the other end of the connection and translated into
- an exception, and no error will be logged.
- """
- L=[]
- c, s, p = connectedServerAndClient(
- ServerClass=SimpleSymmetricCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
- HELLO = 'fuck you'
- c.sendHello(HELLO).addErrback(L.append)
- p.flush()
- L[0].trap(UnfriendlyGreeting)
- self.assertEquals(str(L[0].value), "Don't be a dick.")
-
-
- def test_helloFatalErrorHandling(self):
- """
- Verify that if a known, fatal error type is raised and handled, it will
- be properly relayed to the other end of the connection and translated
- into an exception, no error will be logged, and the connection will be
- terminated.
- """
- L=[]
- c, s, p = connectedServerAndClient(
- ServerClass=SimpleSymmetricCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
- HELLO = 'die'
- c.sendHello(HELLO).addErrback(L.append)
- p.flush()
- L.pop().trap(DeathThreat)
- c.sendHello(HELLO).addErrback(L.append)
- p.flush()
- L.pop().trap(error.ConnectionDone)
-
-
-
- def test_helloNoErrorHandling(self):
- """
- Verify that if an unknown error type is raised, it will be relayed to
- the other end of the connection and translated into an exception, it
- will be logged, and then the connection will be dropped.
- """
- L=[]
- c, s, p = connectedServerAndClient(
- ServerClass=SimpleSymmetricCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
- HELLO = THING_I_DONT_UNDERSTAND
- c.sendHello(HELLO).addErrback(L.append)
- p.flush()
- ure = L.pop()
- ure.trap(amp.UnknownRemoteError)
- c.sendHello(HELLO).addErrback(L.append)
- cl = L.pop()
- cl.trap(error.ConnectionDone)
- # The exception should have been logged.
- self.failUnless(self.flushLoggedErrors(ThingIDontUnderstandError))
-
-
-
- def test_lateAnswer(self):
- """
- Verify that a command that does not get answered until after the
- connection terminates will not cause any errors.
- """
- c, s, p = connectedServerAndClient(
- ServerClass=SimpleSymmetricCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
- L = []
- HELLO = 'world'
- c.callRemote(WaitForever).addErrback(L.append)
- p.flush()
- self.assertEquals(L, [])
- s.transport.loseConnection()
- p.flush()
- L.pop().trap(error.ConnectionDone)
- # Just make sure that it doesn't error...
- s.waiting.callback({})
- return s.waiting
-
-
- def test_requiresNoAnswer(self):
- """
- Verify that a command that requires no answer is run.
- """
- L=[]
- c, s, p = connectedServerAndClient(
- ServerClass=SimpleSymmetricCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
- HELLO = 'world'
- c.callRemote(NoAnswerHello, hello=HELLO)
- p.flush()
- self.failUnless(s.greeted)
-
-
- def test_requiresNoAnswerFail(self):
- """
- Verify that commands sent after a failed no-answer request do not complete.
- """
- L=[]
- c, s, p = connectedServerAndClient(
- ServerClass=SimpleSymmetricCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
- HELLO = 'fuck you'
- c.callRemote(NoAnswerHello, hello=HELLO)
- p.flush()
- # This should be logged locally.
- self.failUnless(self.flushLoggedErrors(amp.RemoteAmpError))
- HELLO = 'world'
- c.callRemote(Hello, hello=HELLO).addErrback(L.append)
- p.flush()
- L.pop().trap(error.ConnectionDone)
- self.failIf(s.greeted)
-
-
- def test_noAnswerResponderBadAnswer(self):
- """
- Verify that responders of requiresAnswer=False commands have to return
- a dictionary anyway.
-
- (requiresAnswer is a hint from the _client_ - the server may be called
- upon to answer commands in any case, if the client wants to know when
- they complete.)
- """
- c, s, p = connectedServerAndClient(
- ServerClass=BadNoAnswerCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
- c.callRemote(NoAnswerHello, hello="hello")
- p.flush()
- le = self.flushLoggedErrors(amp.BadLocalReturn)
- self.assertEquals(len(le), 1)
-
-
- def test_noAnswerResponderAskedForAnswer(self):
- """
- Verify that responders with requiresAnswer=False will actually respond
- if the client sets requiresAnswer=True. In other words, verify that
- requiresAnswer is a hint honored only by the client.
- """
- c, s, p = connectedServerAndClient(
- ServerClass=NoAnswerCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
- L = []
- c.callRemote(Hello, hello="Hello!").addCallback(L.append)
- p.flush()
- self.assertEquals(len(L), 1)
- self.assertEquals(L, [dict(hello="Hello!-noanswer",
- Print=None)]) # Optional response argument
-
-
- def test_ampListCommand(self):
- """
- Test encoding of an argument that uses the AmpList encoding.
- """
- c, s, p = connectedServerAndClient(
- ServerClass=SimpleSymmetricCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
- L = []
- c.callRemote(GetList, length=10).addCallback(L.append)
- p.flush()
- values = L.pop().get('body')
- self.assertEquals(values, [{'x': 1}] * 10)
-
-
- def test_failEarlyOnArgSending(self):
- """
- Verify that if we pass an invalid argument list (omitting an argument), an
- exception will be raised.
- """
- okayCommand = Hello(hello="What?")
- self.assertRaises(amp.InvalidSignature, Hello)
-
-
- def test_doubleProtocolSwitch(self):
- """
- As a debugging aid, a protocol system should raise a
- L{ProtocolSwitched} exception when asked to switch a protocol that is
- already switched.
- """
- serverDeferred = defer.Deferred()
- serverProto = SimpleSymmetricCommandProtocol(serverDeferred)
- clientDeferred = defer.Deferred()
- clientProto = SimpleSymmetricCommandProtocol(clientDeferred)
- c, s, p = connectedServerAndClient(ServerClass=lambda: serverProto,
- ClientClass=lambda: clientProto)
- def switched(result):
- self.assertRaises(amp.ProtocolSwitched, c.switchToTestProtocol)
- self.testSucceeded = True
- c.switchToTestProtocol().addCallback(switched)
- p.flush()
- self.failUnless(self.testSucceeded)
-
-
- def test_protocolSwitch(self, switcher=SimpleSymmetricCommandProtocol,
- spuriousTraffic=False,
- spuriousError=False):
- """
- Verify that it is possible to switch to another protocol mid-connection and
- send data to it successfully.
- """
- self.testSucceeded = False
-
- serverDeferred = defer.Deferred()
- serverProto = switcher(serverDeferred)
- clientDeferred = defer.Deferred()
- clientProto = switcher(clientDeferred)
- c, s, p = connectedServerAndClient(ServerClass=lambda: serverProto,
- ClientClass=lambda: clientProto)
-
- if spuriousTraffic:
- wfdr = [] # remote
- wfd = c.callRemote(WaitForever).addErrback(wfdr.append)
- switchDeferred = c.switchToTestProtocol()
- if spuriousTraffic:
- self.assertRaises(amp.ProtocolSwitched, c.sendHello, 'world')
-
- def cbConnsLost(((serverSuccess, serverData),
- (clientSuccess, clientData))):
- self.failUnless(serverSuccess)
- self.failUnless(clientSuccess)
- self.assertEquals(''.join(serverData), SWITCH_CLIENT_DATA)
- self.assertEquals(''.join(clientData), SWITCH_SERVER_DATA)
- self.testSucceeded = True
-
- def cbSwitch(proto):
- return defer.DeferredList(
- [serverDeferred, clientDeferred]).addCallback(cbConnsLost)
-
- switchDeferred.addCallback(cbSwitch)
- p.flush()
- if serverProto.maybeLater is not None:
- serverProto.maybeLater.callback(serverProto.maybeLaterProto)
- p.flush()
- if spuriousTraffic:
- # switch is done here; do this here to make sure that if we're
- # going to corrupt the connection, we do it before it's closed.
- if spuriousError:
- s.waiting.errback(amp.RemoteAmpError(
- "SPURIOUS",
- "Here's some traffic in the form of an error."))
- else:
- s.waiting.callback({})
- p.flush()
- c.transport.loseConnection() # close it
- p.flush()
- self.failUnless(self.testSucceeded)
-
-
- def test_protocolSwitchDeferred(self):
- """
- Verify that protocol-switching even works if the value returned from
- the command that does the switch is deferred.
- """
- return self.test_protocolSwitch(switcher=DeferredSymmetricCommandProtocol)
-
-
- def test_protocolSwitchFail(self, switcher=SimpleSymmetricCommandProtocol):
- """
- Verify that if we try to switch protocols and it fails, the connection
- stays up and we can go back to speaking AMP.
- """
- self.testSucceeded = False
-
- serverDeferred = defer.Deferred()
- serverProto = switcher(serverDeferred)
- clientDeferred = defer.Deferred()
- clientProto = switcher(clientDeferred)
- c, s, p = connectedServerAndClient(ServerClass=lambda: serverProto,
- ClientClass=lambda: clientProto)
- L = []
- switchDeferred = c.switchToTestProtocol(fail=True).addErrback(L.append)
- p.flush()
- L.pop().trap(UnknownProtocol)
- self.failIf(self.testSucceeded)
- # It's a known error, so let's send a "hello" on the same connection;
- # it should work.
- c.sendHello('world').addCallback(L.append)
- p.flush()
- self.assertEqual(L.pop()['hello'], 'world')
-
-
- def test_trafficAfterSwitch(self):
- """
- Verify that attempts to send traffic after a switch will not corrupt
- the nested protocol.
- """
- return self.test_protocolSwitch(spuriousTraffic=True)
-
-
- def test_errorAfterSwitch(self):
- """
- Returning an error after a protocol switch should record the underlying
- error.
- """
- return self.test_protocolSwitch(spuriousTraffic=True,
- spuriousError=True)
-
-
- def test_quitBoxQuits(self):
- """
- Verify that commands with a responseType of QuitBox will in fact
- terminate the connection.
- """
- c, s, p = connectedServerAndClient(
- ServerClass=SimpleSymmetricCommandProtocol,
- ClientClass=SimpleSymmetricCommandProtocol)
-
- L = []
- HELLO = 'world'
- GOODBYE = 'everyone'
- c.sendHello(HELLO).addCallback(L.append)
- p.flush()
- self.assertEquals(L.pop()['hello'], HELLO)
- c.callRemote(Goodbye).addCallback(L.append)
- p.flush()
- self.assertEquals(L.pop()['goodbye'], GOODBYE)
- c.sendHello(HELLO).addErrback(L.append)
- L.pop().trap(error.ConnectionDone)
-
-
- def test_basicLiteralEmit(self):
- """
- Verify that the command dictionaries for a callRemoteN look correct
- after being serialized and parsed.
- """
- c, s, p = connectedServerAndClient()
- L = []
- s.ampBoxReceived = L.append
- c.callRemote(Hello, hello='hello test', mixedCase='mixed case arg test',
- dash_arg='x', underscore_arg='y')
- p.flush()
- self.assertEquals(len(L), 1)
- for k, v in [('_command', Hello.commandName),
- ('hello', 'hello test'),
- ('mixedCase', 'mixed case arg test'),
- ('dash-arg', 'x'),
- ('underscore_arg', 'y')]:
- self.assertEquals(L[-1].pop(k), v)
- L[-1].pop('_ask')
- self.assertEquals(L[-1], {})
-
-
- def test_basicStructuredEmit(self):
- """
- Verify that a call similar to basicLiteralEmit's is handled properly with
- high-level quoting and passing to Python methods, and that argument
- names are correctly handled.
- """
- L = []
- class StructuredHello(amp.AMP):
- def h(self, *a, **k):
- L.append((a, k))
- return dict(hello='aaa')
- Hello.responder(h)
- c, s, p = connectedServerAndClient(ServerClass=StructuredHello)
- c.callRemote(Hello, hello='hello test', mixedCase='mixed case arg test',
- dash_arg='x', underscore_arg='y').addCallback(L.append)
- p.flush()
- self.assertEquals(len(L), 2)
- self.assertEquals(L[0],
- ((), dict(
- hello='hello test',
- mixedCase='mixed case arg test',
- dash_arg='x',
- underscore_arg='y',
-
- # XXX - should optional arguments just not be passed?
- # passing None seems a little odd, looking at the way it
- # turns out here... -glyph
- From=('file', 'file'),
- Print=None,
- optional=None,
- )))
- self.assertEquals(L[1], dict(Print=None, hello='aaa'))
-
-class PretendRemoteCertificateAuthority:
- def checkIsPretendRemote(self):
- return True
-
-class IOSimCert:
- verifyCount = 0
-
- def options(self, *ign):
- return self
-
- def iosimVerify(self, otherCert):
- """
- This isn't a real certificate, and wouldn't work on a real socket, but
- iosim specifies a different API so that we don't have to do any crypto
- math to demonstrate that the right functions get called in the right
- places.
- """
- assert otherCert is self
- self.verifyCount += 1
- return True
-
-class OKCert(IOSimCert):
- def options(self, x):
- assert x.checkIsPretendRemote()
- return self
-
-class GrumpyCert(IOSimCert):
- def iosimVerify(self, otherCert):
- self.verifyCount += 1
- return False
-
-class DroppyCert(IOSimCert):
- def __init__(self, toDrop):
- self.toDrop = toDrop
-
- def iosimVerify(self, otherCert):
- self.verifyCount += 1
- self.toDrop.loseConnection()
- return True
-
-class SecurableProto(FactoryNotifier):
-
- factory = None
-
- def verifyFactory(self):
- return [PretendRemoteCertificateAuthority()]
-
- def getTLSVars(self):
- cert = self.certFactory()
- verify = self.verifyFactory()
- return dict(
- tls_localCertificate=cert,
- tls_verifyAuthorities=verify)
- amp.StartTLS.responder(getTLSVars)
-
-
-
-class TLSTest(unittest.TestCase):
- def test_startingTLS(self):
- """
- Verify that starting TLS and succeeding at handshaking sends all the
- notifications to all the right places.
- """
- cli, svr, p = connectedServerAndClient(
- ServerClass=SecurableProto,
- ClientClass=SecurableProto)
-
- okc = OKCert()
- svr.certFactory = lambda : okc
-
- cli.callRemote(
- amp.StartTLS, tls_localCertificate=okc,
- tls_verifyAuthorities=[PretendRemoteCertificateAuthority()])
-
- # let's buffer something to be delivered securely
- L = []
- d = cli.callRemote(SecuredPing).addCallback(L.append)
- p.flush()
- # once for client once for server
- self.assertEquals(okc.verifyCount, 2)
- L = []
- d = cli.callRemote(SecuredPing).addCallback(L.append)
- p.flush()
- self.assertEqual(L[0], {'pinged': True})
-
-
- def test_startTooManyTimes(self):
- """
- Verify that the protocol will complain if we attempt to renegotiate TLS,
- which we don't support.
- """
- cli, svr, p = connectedServerAndClient(
- ServerClass=SecurableProto,
- ClientClass=SecurableProto)
-
- okc = OKCert()
- svr.certFactory = lambda : okc
-
- cli.callRemote(amp.StartTLS,
- tls_localCertificate=okc,
- tls_verifyAuthorities=[PretendRemoteCertificateAuthority()])
- p.flush()
- cli.noPeerCertificate = True # this is totally fake
- self.assertRaises(
- amp.OnlyOneTLS,
- cli.callRemote,
- amp.StartTLS,
- tls_localCertificate=okc,
- tls_verifyAuthorities=[PretendRemoteCertificateAuthority()])
-
-
- def test_negotiationFailed(self):
- """
- Verify that starting TLS and failing on both sides at handshaking sends
- notifications to all the right places and terminates the connection.
- """
-
- badCert = GrumpyCert()
-
- cli, svr, p = connectedServerAndClient(
- ServerClass=SecurableProto,
- ClientClass=SecurableProto)
- svr.certFactory = lambda : badCert
-
- cli.callRemote(amp.StartTLS,
- tls_localCertificate=badCert)
-
- p.flush()
- # once for client once for server - but both fail
- self.assertEquals(badCert.verifyCount, 2)
- d = cli.callRemote(SecuredPing)
- p.flush()
- self.assertFailure(d, iosim.OpenSSLVerifyError)
-
-
- def test_negotiationFailedByClosing(self):
- """
- Verify that starting TLS and failing by way of a lost connection
- notices that it is probably an SSL problem.
- """
-
- cli, svr, p = connectedServerAndClient(
- ServerClass=SecurableProto,
- ClientClass=SecurableProto)
- droppyCert = DroppyCert(svr.transport)
- svr.certFactory = lambda : droppyCert
-
- secure = cli.callRemote(amp.StartTLS,
- tls_localCertificate=droppyCert)
-
- p.flush()
-
- self.assertEquals(droppyCert.verifyCount, 2)
-
- d = cli.callRemote(SecuredPing)
- p.flush()
-
- # it might be a good idea to move this exception somewhere more
- # reasonable.
- self.assertFailure(d, error.PeerVerifyError)
-
-
-
-class InheritedError(Exception):
- """
- This error is used to check inheritance.
- """
-
-
-
-class OtherInheritedError(Exception):
- """
- This is a distinct error for checking inheritance.
- """
-
-
-
-class BaseCommand(amp.Command):
- """
- This provides a command that will be subclassed.
- """
- errors = {InheritedError: 'INHERITED_ERROR'}
-
-
-
-class InheritedCommand(BaseCommand):
- """
- This is a command which subclasses another command but does not override
- anything.
- """
-
-
-
-class AddErrorsCommand(BaseCommand):
- """
- This is a command which subclasses another command but adds errors to the
- list.
- """
- arguments = [('other', amp.Boolean())]
- errors = {OtherInheritedError: 'OTHER_INHERITED_ERROR'}
-
-
-
-class NormalCommandProtocol(amp.AMP):
- """
- This is a protocol which responds to L{BaseCommand}, and is used to test
- that inheritance does not interfere with the normal handling of errors.
- """
- def resp(self):
- raise InheritedError()
- BaseCommand.responder(resp)
-
-
-
-class InheritedCommandProtocol(amp.AMP):
- """
- This is a protocol which responds to L{InheritedCommand}, and is used to
- test that inherited commands inherit their bases' errors if they do not
- respond to any of their own.
- """
- def resp(self):
- raise InheritedError()
- InheritedCommand.responder(resp)
-
-
-
-class AddedCommandProtocol(amp.AMP):
- """
- This is a protocol which responds to L{AddErrorsCommand}, and is used to
- test that inherited commands can add their own new types of errors, but
- still respond in the same way to their parents types of errors.
- """
- def resp(self, other):
- if other:
- raise OtherInheritedError()
- else:
- raise InheritedError()
- AddErrorsCommand.responder(resp)
-
-
-
-class CommandInheritanceTests(unittest.TestCase):
- """
- These tests verify that commands inherit error conditions properly.
- """
-
- def errorCheck(self, err, proto, cmd, **kw):
- """
- Check that the appropriate kind of error is raised when a given command
- is sent to a given protocol.
- """
- c, s, p = connectedServerAndClient(ServerClass=proto,
- ClientClass=proto)
- d = c.callRemote(cmd, **kw)
- d2 = self.failUnlessFailure(d, err)
- p.flush()
- return d2
-
-
- def test_basicErrorPropagation(self):
- """
- Verify that errors specified in a superclass are respected normally
- even if it has subclasses.
- """
- return self.errorCheck(
- InheritedError, NormalCommandProtocol, BaseCommand)
-
-
- def test_inheritedErrorPropagation(self):
- """
- Verify that errors specified in a superclass command are propagated to
- its subclasses.
- """
- return self.errorCheck(
- InheritedError, InheritedCommandProtocol, InheritedCommand)
-
-
- def test_inheritedErrorAddition(self):
- """
- Verify that new errors specified in a subclass of an existing command
- are honored even if the superclass defines some errors.
- """
- return self.errorCheck(
- OtherInheritedError, AddedCommandProtocol, AddErrorsCommand, other=True)
-
-
- def test_additionWithOriginalError(self):
- """
- Verify that errors specified in a command's superclass are respected
- even if that command defines new errors itself.
- """
- return self.errorCheck(
- InheritedError, AddedCommandProtocol, AddErrorsCommand, other=False)
-
-
-def _loseAndPass(err, proto):
- # be specific, pass on the error to the client.
- err.trap(error.ConnectionLost, error.ConnectionDone)
- del proto.connectionLost
- proto.connectionLost(err)
-
-
-class LiveFireBase:
- """
- Utility for connected reactor-using tests.
- """
-
- def setUp(self):
- """
- Create an amp server and connect a client to it.
- """
- from twisted.internet import reactor
- self.serverFactory = protocol.ServerFactory()
- self.serverFactory.protocol = self.serverProto
- self.clientFactory = protocol.ClientFactory()
- self.clientFactory.protocol = self.clientProto
- self.clientFactory.onMade = defer.Deferred()
- self.serverFactory.onMade = defer.Deferred()
- self.serverPort = reactor.listenTCP(0, self.serverFactory)
- self.addCleanup(self.serverPort.stopListening)
- self.clientConn = reactor.connectTCP(
- '127.0.0.1', self.serverPort.getHost().port,
- self.clientFactory)
- self.addCleanup(self.clientConn.disconnect)
- def getProtos(rlst):
- self.cli = self.clientFactory.theProto
- self.svr = self.serverFactory.theProto
- dl = defer.DeferredList([self.clientFactory.onMade,
- self.serverFactory.onMade])
- return dl.addCallback(getProtos)
-
- def tearDown(self):
- """
- Cleanup client and server connections, and check the error got at
- C{connectionLost}.
- """
- L = []
- for conn in self.cli, self.svr:
- if conn.transport is not None:
- # depend on amp's function connection-dropping behavior
- d = defer.Deferred().addErrback(_loseAndPass, conn)
- conn.connectionLost = d.errback
- conn.transport.loseConnection()
- L.append(d)
- return defer.gatherResults(L
- ).addErrback(lambda first: first.value.subFailure)
-
-
-def show(x):
- import sys
- sys.stdout.write(x+'\n')
- sys.stdout.flush()
-
-
-def tempSelfSigned():
- from twisted.internet import ssl
-
- sharedDN = ssl.DN(CN='shared')
- key = ssl.KeyPair.generate()
- cr = key.certificateRequest(sharedDN)
- sscrd = key.signCertificateRequest(
- sharedDN, cr, lambda dn: True, 1234567)
- cert = key.newCertificate(sscrd)
- return cert
-
-tempcert = tempSelfSigned()
-
-
-class LiveFireTLSTestCase(LiveFireBase, unittest.TestCase):
- clientProto = SecurableProto
- serverProto = SecurableProto
- def test_liveFireCustomTLS(self):
- """
- Using real, live TLS, actually negotiate a connection.
-
- This also looks at the 'peerCertificate' attribute's correctness, since
- that's actually loaded using OpenSSL calls, but the main purpose is to
- make sure that we didn't miss anything obvious in iosim about TLS
- negotiations.
- """
-
- cert = tempcert
-
- self.svr.verifyFactory = lambda : [cert]
- self.svr.certFactory = lambda : cert
- # only needed on the server, we specify the client below.
-
- def secured(rslt):
- x = cert.digest()
- def pinged(rslt2):
- # Interesting. OpenSSL won't even _tell_ us about the peer
- # cert until we negotiate. we should be able to do this in
- # 'secured' instead, but it looks like we can't. I think this
- # is a bug somewhere far deeper than here.
- self.failUnlessEqual(x, self.cli.hostCertificate.digest())
- self.failUnlessEqual(x, self.cli.peerCertificate.digest())
- self.failUnlessEqual(x, self.svr.hostCertificate.digest())
- self.failUnlessEqual(x, self.svr.peerCertificate.digest())
- return self.cli.callRemote(SecuredPing).addCallback(pinged)
- return self.cli.callRemote(amp.StartTLS,
- tls_localCertificate=cert,
- tls_verifyAuthorities=[cert]).addCallback(secured)
-
-
-class SlightlySmartTLS(SimpleSymmetricCommandProtocol):
- """
- Specific implementation of server side protocol with different
- management of TLS.
- """
- def getTLSVars(self):
- """
- @return: the global C{tempcert} certificate as local certificate.
- """
- return dict(tls_localCertificate=tempcert)
- amp.StartTLS.responder(getTLSVars)
-
-
-class PlainVanillaLiveFire(LiveFireBase, unittest.TestCase):
-
- clientProto = SimpleSymmetricCommandProtocol
- serverProto = SimpleSymmetricCommandProtocol
-
- def test_liveFireDefaultTLS(self):
- """
- Verify that out of the box, we can start TLS to at least encrypt the
- connection, even if we don't have any certificates to use.
- """
- def secured(result):
- return self.cli.callRemote(SecuredPing)
- return self.cli.callRemote(amp.StartTLS).addCallback(secured)
-
-
-class WithServerTLSVerification(LiveFireBase, unittest.TestCase):
- clientProto = SimpleSymmetricCommandProtocol
- serverProto = SlightlySmartTLS
-
- def test_anonymousVerifyingClient(self):
- """
- Verify that anonymous clients can verify server certificates.
- """
- def secured(result):
- return self.cli.callRemote(SecuredPing)
- return self.cli.callRemote(amp.StartTLS,
- tls_verifyAuthorities=[tempcert]
- ).addCallback(secured)
-
-
-
-class ProtocolIncludingArgument(amp.Argument):
- """
- An L{amp.Argument} which encodes its parser and serializer
- arguments *including the protocol* into its parsed and serialized
- forms.
- """
-
- def fromStringProto(self, string, protocol):
- """
- Don't decode anything; just return all possible information.
-
- @return: A two-tuple of the input string and the protocol.
- """
- return (string, protocol)
-
- def toStringProto(self, obj, protocol):
- """
- Encode identifying information about L{object} and protocol
- into a string for later verification.
-
- @type obj: L{object}
- @type protocol: L{amp.AMP}
- """
- return "%s:%s" % (id(obj), id(protocol))
-
-
-
-class ProtocolIncludingCommand(amp.Command):
- """
- A command that has argument and response schemas which use
- L{ProtocolIncludingArgument}.
- """
- arguments = [('weird', ProtocolIncludingArgument())]
- response = [('weird', ProtocolIncludingArgument())]
-
-
-
-class MagicSchemaCommand(amp.Command):
- """
- A command which overrides L{parseResponse}, L{parseArguments}, and
- L{makeResponse}.
- """
- def parseResponse(self, strings, protocol):
- """
- Don't do any parsing, just jam the input strings and protocol
- onto the C{protocol.parseResponseArguments} attribute as a
- two-tuple. Return the original strings.
- """
- protocol.parseResponseArguments = (strings, protocol)
- return strings
- parseResponse = classmethod(parseResponse)
-
-
- def parseArguments(cls, strings, protocol):
- """
- Don't do any parsing, just jam the input strings and protocol
- onto the C{protocol.parseArgumentsArguments} attribute as a
- two-tuple. Return the original strings.
- """
- protocol.parseArgumentsArguments = (strings, protocol)
- return strings
- parseArguments = classmethod(parseArguments)
-
-
- def makeArguments(cls, objects, protocol):
- """
- Don't do any serializing, just jam the input strings and protocol
- onto the C{protocol.makeArgumentsArguments} attribute as a
- two-tuple. Return the original strings.
- """
- protocol.makeArgumentsArguments = (objects, protocol)
- return objects
- makeArguments = classmethod(makeArguments)
-
-
-
-class NoNetworkProtocol(amp.AMP):
- """
- An L{amp.AMP} subclass which overrides private methods to avoid
- testing the network. It also provides a responder for
- L{MagicSchemaCommand} that does nothing, so that tests can test
- aspects of the interaction of L{amp.Command}s and L{amp.AMP}.
-
- @ivar parseArgumentsArguments: Arguments that have been passed to any
- L{MagicSchemaCommand}, if L{MagicSchemaCommand} has been handled by
- this protocol.
-
- @ivar parseResponseArguments: Responses that have been returned from a
- L{MagicSchemaCommand}, if L{MagicSchemaCommand} has been handled by
- this protocol.
-
- @ivar makeArgumentsArguments: Arguments that have been serialized by any
- L{MagicSchemaCommand}, if L{MagicSchemaCommand} has been handled by
- this protocol.
- """
- def _sendBoxCommand(self, commandName, strings, requiresAnswer):
- """
- Return a Deferred which fires with the original strings.
- """
- return defer.succeed(strings)
-
- MagicSchemaCommand.responder(lambda s, weird: {})
-
-
-
-class MyBox(dict):
- """
- A unique dict subclass.
- """
-
-
-
-class ProtocolIncludingCommandWithDifferentCommandType(
- ProtocolIncludingCommand):
- """
- A L{ProtocolIncludingCommand} subclass whose commandType is L{MyBox}
- """
- commandType = MyBox
-
-
-
-class CommandTestCase(unittest.TestCase):
- """
- Tests for L{amp.Command}.
- """
-
- def test_parseResponse(self):
- """
- There should be a class method of Command which accepts a
- mapping of argument names to serialized forms and returns a
- similar mapping whose values have been parsed via the
- Command's response schema.
- """
- protocol = object()
- result = 'whatever'
- strings = {'weird': result}
- self.assertEqual(
- ProtocolIncludingCommand.parseResponse(strings, protocol),
- {'weird': (result, protocol)})
-
-
- def test_callRemoteCallsParseResponse(self):
- """
- Making a remote call on a L{amp.Command} subclass which
- overrides the C{parseResponse} method should call that
- C{parseResponse} method to get the response.
- """
- client = NoNetworkProtocol()
- thingy = "weeoo"
- response = client.callRemote(MagicSchemaCommand, weird=thingy)
- def gotResponse(ign):
- self.assertEquals(client.parseResponseArguments,
- ({"weird": thingy}, client))
- response.addCallback(gotResponse)
- return response
-
-
- def test_parseArguments(self):
- """
- There should be a class method of L{amp.Command} which accepts
- a mapping of argument names to serialized forms and returns a
- similar mapping whose values have been parsed via the
- command's argument schema.
- """
- protocol = object()
- result = 'whatever'
- strings = {'weird': result}
- self.assertEqual(
- ProtocolIncludingCommand.parseArguments(strings, protocol),
- {'weird': (result, protocol)})
-
-
- def test_responderCallsParseArguments(self):
- """
- Making a remote call on a L{amp.Command} subclass which
- overrides the C{parseArguments} method should call that
- C{parseArguments} method to get the arguments.
- """
- protocol = NoNetworkProtocol()
- responder = protocol.locateResponder(MagicSchemaCommand.commandName)
- argument = object()
- response = responder(dict(weird=argument))
- response.addCallback(
- lambda ign: self.assertEqual(protocol.parseArgumentsArguments,
- ({"weird": argument}, protocol)))
- return response
-
-
- def test_makeArguments(self):
- """
- There should be a class method of L{amp.Command} which accepts
- a mapping of argument names to objects and returns a similar
- mapping whose values have been serialized via the command's
- argument schema.
- """
- protocol = object()
- argument = object()
- objects = {'weird': argument}
- self.assertEqual(
- ProtocolIncludingCommand.makeArguments(objects, protocol),
- {'weird': "%d:%d" % (id(argument), id(protocol))})
-
-
- def test_makeArgumentsUsesCommandType(self):
- """
- L{amp.Command.makeArguments}'s return type should be the type
- of the result of L{amp.Command.commandType}.
- """
- protocol = object()
- objects = {"weird": "whatever"}
-
- result = ProtocolIncludingCommandWithDifferentCommandType.makeArguments(
- objects, protocol)
- self.assertIdentical(type(result), MyBox)
-
-
- def test_callRemoteCallsMakeArguments(self):
- """
- Making a remote call on a L{amp.Command} subclass which
- overrides the C{makeArguments} method should call that
- C{makeArguments} method to get the response.
- """
- client = NoNetworkProtocol()
- argument = object()
- response = client.callRemote(MagicSchemaCommand, weird=argument)
- def gotResponse(ign):
- self.assertEqual(client.makeArgumentsArguments,
- ({"weird": argument}, client))
- response.addCallback(gotResponse)
- return response
-
-if not interfaces.IReactorSSL.providedBy(reactor):
- skipMsg = 'This test case requires SSL support in the reactor'
- TLSTest.skip = skipMsg
- LiveFireTLSTestCase.skip = skipMsg
- PlainVanillaLiveFire.skip = skipMsg
- WithServerTLSVerification.skip = skipMsg
-
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_adbapi.py ('k') | third_party/twisted_8_1/twisted/test/test_application.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698