| Index: third_party/twisted_8_1/twisted/test/test_amp.py
|
| diff --git a/third_party/twisted_8_1/twisted/test/test_amp.py b/third_party/twisted_8_1/twisted/test/test_amp.py
|
| deleted file mode 100644
|
| index 50271220065f055c52ea6781bc1ff6ea4bd1e73c..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/test/test_amp.py
|
| +++ /dev/null
|
| @@ -1,2115 +0,0 @@
|
| -# Copyright (c) 2005 Divmod, Inc.
|
| -# Copyright (c) 2007 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -from twisted.python import filepath
|
| -from twisted.python.failure import Failure
|
| -from twisted.protocols import amp
|
| -from twisted.test import iosim
|
| -from twisted.trial import unittest
|
| -from twisted.internet import protocol, defer, error, reactor, interfaces
|
| -
|
| -
|
| -class TestProto(protocol.Protocol):
|
| - def __init__(self, onConnLost, dataToSend):
|
| - self.onConnLost = onConnLost
|
| - self.dataToSend = dataToSend
|
| -
|
| - def connectionMade(self):
|
| - self.data = []
|
| - self.transport.write(self.dataToSend)
|
| -
|
| - def dataReceived(self, bytes):
|
| - self.data.append(bytes)
|
| - # self.transport.loseConnection()
|
| -
|
| - def connectionLost(self, reason):
|
| - self.onConnLost.callback(self.data)
|
| -
|
| -
|
| -
|
| -class SimpleSymmetricProtocol(amp.AMP):
|
| -
|
| - def sendHello(self, text):
|
| - return self.callRemoteString(
|
| - "hello",
|
| - hello=text)
|
| -
|
| - def amp_HELLO(self, box):
|
| - return amp.Box(hello=box['hello'])
|
| -
|
| - def amp_HOWDOYOUDO(self, box):
|
| - return amp.QuitBox(howdoyoudo='world')
|
| -
|
| -
|
| -
|
| -class UnfriendlyGreeting(Exception):
|
| - """Greeting was insufficiently kind.
|
| - """
|
| -
|
| -class DeathThreat(Exception):
|
| - """Greeting was insufficiently kind.
|
| - """
|
| -
|
| -class UnknownProtocol(Exception):
|
| - """Asked to switch to the wrong protocol.
|
| - """
|
| -
|
| -
|
| -class TransportPeer(amp.Argument):
|
| - # this serves as some informal documentation for how to get variables from
|
| - # the protocol or your environment and pass them to methods as arguments.
|
| - def retrieve(self, d, name, proto):
|
| - return ''
|
| -
|
| - def fromStringProto(self, notAString, proto):
|
| - return proto.transport.getPeer()
|
| -
|
| - def toBox(self, name, strings, objects, proto):
|
| - return
|
| -
|
| -
|
| -
|
| -class Hello(amp.Command):
|
| -
|
| - commandName = 'hello'
|
| -
|
| - arguments = [('hello', amp.String()),
|
| - ('optional', amp.Boolean(optional=True)),
|
| - ('print', amp.Unicode(optional=True)),
|
| - ('from', TransportPeer(optional=True)),
|
| - ('mixedCase', amp.String(optional=True)),
|
| - ('dash-arg', amp.String(optional=True)),
|
| - ('underscore_arg', amp.String(optional=True))]
|
| -
|
| - response = [('hello', amp.String()),
|
| - ('print', amp.Unicode(optional=True))]
|
| -
|
| - errors = {UnfriendlyGreeting: 'UNFRIENDLY'}
|
| -
|
| - fatalErrors = {DeathThreat: 'DEAD'}
|
| -
|
| -class NoAnswerHello(Hello):
|
| - commandName = Hello.commandName
|
| - requiresAnswer = False
|
| -
|
| -class FutureHello(amp.Command):
|
| - commandName = 'hello'
|
| -
|
| - arguments = [('hello', amp.String()),
|
| - ('optional', amp.Boolean(optional=True)),
|
| - ('print', amp.Unicode(optional=True)),
|
| - ('from', TransportPeer(optional=True)),
|
| - ('bonus', amp.String(optional=True)), # addt'l arguments
|
| - # should generally be
|
| - # added at the end, and
|
| - # be optional...
|
| - ]
|
| -
|
| - response = [('hello', amp.String()),
|
| - ('print', amp.Unicode(optional=True))]
|
| -
|
| - errors = {UnfriendlyGreeting: 'UNFRIENDLY'}
|
| -
|
| -class WTF(amp.Command):
|
| - """
|
| - An example of an invalid command.
|
| - """
|
| -
|
| -
|
| -class BrokenReturn(amp.Command):
|
| - """ An example of a perfectly good command, but the handler is going to return
|
| - None...
|
| - """
|
| -
|
| - commandName = 'broken_return'
|
| -
|
| -class Goodbye(amp.Command):
|
| - # commandName left blank on purpose: this tests implicit command names.
|
| - response = [('goodbye', amp.String())]
|
| - responseType = amp.QuitBox
|
| -
|
| -class Howdoyoudo(amp.Command):
|
| - commandName = 'howdoyoudo'
|
| - # responseType = amp.QuitBox
|
| -
|
| -class WaitForever(amp.Command):
|
| - commandName = 'wait_forever'
|
| -
|
| -class GetList(amp.Command):
|
| - commandName = 'getlist'
|
| - arguments = [('length', amp.Integer())]
|
| - response = [('body', amp.AmpList([('x', amp.Integer())]))]
|
| -
|
| -class SecuredPing(amp.Command):
|
| - # XXX TODO: actually make this refuse to send over an insecure connection
|
| - response = [('pinged', amp.Boolean())]
|
| -
|
| -class TestSwitchProto(amp.ProtocolSwitchCommand):
|
| - commandName = 'Switch-Proto'
|
| -
|
| - arguments = [
|
| - ('name', amp.String()),
|
| - ]
|
| - errors = {UnknownProtocol: 'UNKNOWN'}
|
| -
|
| -class SingleUseFactory(protocol.ClientFactory):
|
| - def __init__(self, proto):
|
| - self.proto = proto
|
| - self.proto.factory = self
|
| -
|
| - def buildProtocol(self, addr):
|
| - p, self.proto = self.proto, None
|
| - return p
|
| -
|
| - reasonFailed = None
|
| -
|
| - def clientConnectionFailed(self, connector, reason):
|
| - self.reasonFailed = reason
|
| - return
|
| -
|
| -THING_I_DONT_UNDERSTAND = 'gwebol nargo'
|
| -class ThingIDontUnderstandError(Exception):
|
| - pass
|
| -
|
| -class FactoryNotifier(amp.AMP):
|
| - factory = None
|
| - def connectionMade(self):
|
| - if self.factory is not None:
|
| - self.factory.theProto = self
|
| - if hasattr(self.factory, 'onMade'):
|
| - self.factory.onMade.callback(None)
|
| -
|
| - def emitpong(self):
|
| - from twisted.internet.interfaces import ISSLTransport
|
| - if not ISSLTransport.providedBy(self.transport):
|
| - raise DeathThreat("only send secure pings over secure channels")
|
| - return {'pinged': True}
|
| - SecuredPing.responder(emitpong)
|
| -
|
| -
|
| -class SimpleSymmetricCommandProtocol(FactoryNotifier):
|
| - maybeLater = None
|
| - def __init__(self, onConnLost=None):
|
| - amp.AMP.__init__(self)
|
| - self.onConnLost = onConnLost
|
| -
|
| - def sendHello(self, text):
|
| - return self.callRemote(Hello, hello=text)
|
| -
|
| - def sendUnicodeHello(self, text, translation):
|
| - return self.callRemote(Hello, hello=text, Print=translation)
|
| -
|
| - greeted = False
|
| -
|
| - def cmdHello(self, hello, From, optional=None, Print=None,
|
| - mixedCase=None, dash_arg=None, underscore_arg=None):
|
| - assert From == self.transport.getPeer()
|
| - if hello == THING_I_DONT_UNDERSTAND:
|
| - raise ThingIDontUnderstandError()
|
| - if hello.startswith('fuck'):
|
| - raise UnfriendlyGreeting("Don't be a dick.")
|
| - if hello == 'die':
|
| - raise DeathThreat("aieeeeeeeee")
|
| - result = dict(hello=hello)
|
| - if Print is not None:
|
| - result.update(dict(Print=Print))
|
| - self.greeted = True
|
| - return result
|
| - Hello.responder(cmdHello)
|
| -
|
| - def cmdGetlist(self, length):
|
| - return {'body': [dict(x=1)] * length}
|
| - GetList.responder(cmdGetlist)
|
| -
|
| - def waitforit(self):
|
| - self.waiting = defer.Deferred()
|
| - return self.waiting
|
| - WaitForever.responder(waitforit)
|
| -
|
| - def howdo(self):
|
| - return dict(howdoyoudo='world')
|
| - Howdoyoudo.responder(howdo)
|
| -
|
| - def saybye(self):
|
| - return dict(goodbye="everyone")
|
| - Goodbye.responder(saybye)
|
| -
|
| - def switchToTestProtocol(self, fail=False):
|
| - if fail:
|
| - name = 'no-proto'
|
| - else:
|
| - name = 'test-proto'
|
| - p = TestProto(self.onConnLost, SWITCH_CLIENT_DATA)
|
| - return self.callRemote(
|
| - TestSwitchProto,
|
| - SingleUseFactory(p), name=name).addCallback(lambda ign: p)
|
| -
|
| - def switchit(self, name):
|
| - if name == 'test-proto':
|
| - return TestProto(self.onConnLost, SWITCH_SERVER_DATA)
|
| - raise UnknownProtocol(name)
|
| - TestSwitchProto.responder(switchit)
|
| -
|
| - def donothing(self):
|
| - return None
|
| - BrokenReturn.responder(donothing)
|
| -
|
| -
|
| -class DeferredSymmetricCommandProtocol(SimpleSymmetricCommandProtocol):
|
| - def switchit(self, name):
|
| - if name == 'test-proto':
|
| - self.maybeLaterProto = TestProto(self.onConnLost, SWITCH_SERVER_DATA)
|
| - self.maybeLater = defer.Deferred()
|
| - return self.maybeLater
|
| - raise UnknownProtocol(name)
|
| - TestSwitchProto.responder(switchit)
|
| -
|
| -class BadNoAnswerCommandProtocol(SimpleSymmetricCommandProtocol):
|
| - def badResponder(self, hello, From, optional=None, Print=None,
|
| - mixedCase=None, dash_arg=None, underscore_arg=None):
|
| - """
|
| - This responder does nothing and forgets to return a dictionary.
|
| - """
|
| - NoAnswerHello.responder(badResponder)
|
| -
|
| -class NoAnswerCommandProtocol(SimpleSymmetricCommandProtocol):
|
| - def goodNoAnswerResponder(self, hello, From, optional=None, Print=None,
|
| - mixedCase=None, dash_arg=None, underscore_arg=None):
|
| - return dict(hello=hello+"-noanswer")
|
| - NoAnswerHello.responder(goodNoAnswerResponder)
|
| -
|
| -def connectedServerAndClient(ServerClass=SimpleSymmetricProtocol,
|
| - ClientClass=SimpleSymmetricProtocol,
|
| - *a, **kw):
|
| - """Returns a 3-tuple: (client, server, pump)
|
| - """
|
| - return iosim.connectedServerAndClient(
|
| - ServerClass, ClientClass,
|
| - *a, **kw)
|
| -
|
| -class TotallyDumbProtocol(protocol.Protocol):
|
| - buf = ''
|
| - def dataReceived(self, data):
|
| - self.buf += data
|
| -
|
| -class LiteralAmp(amp.AMP):
|
| - def __init__(self):
|
| - self.boxes = []
|
| -
|
| - def ampBoxReceived(self, box):
|
| - self.boxes.append(box)
|
| - return
|
| -
|
| -class ParsingTest(unittest.TestCase):
|
| -
|
| - def test_booleanValues(self):
|
| - """
|
| - Verify that the Boolean parser parses 'True' and 'False', but nothing
|
| - else.
|
| - """
|
| - b = amp.Boolean()
|
| - self.assertEquals(b.fromString("True"), True)
|
| - self.assertEquals(b.fromString("False"), False)
|
| - self.assertRaises(TypeError, b.fromString, "ninja")
|
| - self.assertRaises(TypeError, b.fromString, "true")
|
| - self.assertRaises(TypeError, b.fromString, "TRUE")
|
| - self.assertEquals(b.toString(True), 'True')
|
| - self.assertEquals(b.toString(False), 'False')
|
| -
|
| - def test_pathValueRoundTrip(self):
|
| - """
|
| - Verify the 'Path' argument can parse and emit a file path.
|
| - """
|
| - fp = filepath.FilePath(self.mktemp())
|
| - p = amp.Path()
|
| - s = p.toString(fp)
|
| - v = p.fromString(s)
|
| - self.assertNotIdentical(fp, v) # sanity check
|
| - self.assertEquals(fp, v)
|
| -
|
| -
|
| - def test_sillyEmptyThing(self):
|
| - """
|
| - Test that empty boxes raise an error; they aren't supposed to be sent
|
| - on purpose.
|
| - """
|
| - a = amp.AMP()
|
| - return self.assertRaises(amp.NoEmptyBoxes, a.ampBoxReceived, amp.Box())
|
| -
|
| -
|
| - def test_ParsingRoundTrip(self):
|
| - """
|
| - Verify that various kinds of data make it through the encode/parse
|
| - round-trip unharmed.
|
| - """
|
| - c, s, p = connectedServerAndClient(ClientClass=LiteralAmp,
|
| - ServerClass=LiteralAmp)
|
| -
|
| - SIMPLE = ('simple', 'test')
|
| - CE = ('ceq', ': ')
|
| - CR = ('crtest', 'test\r')
|
| - LF = ('lftest', 'hello\n')
|
| - NEWLINE = ('newline', 'test\r\none\r\ntwo')
|
| - NEWLINE2 = ('newline2', 'test\r\none\r\n two')
|
| - BLANKLINE = ('newline3', 'test\r\n\r\nblank\r\n\r\nline')
|
| - BODYTEST = ('body', 'blah\r\n\r\ntesttest')
|
| -
|
| - testData = [
|
| - [SIMPLE],
|
| - [SIMPLE, BODYTEST],
|
| - [SIMPLE, CE],
|
| - [SIMPLE, CR],
|
| - [SIMPLE, CE, CR, LF],
|
| - [CE, CR, LF],
|
| - [SIMPLE, NEWLINE, CE, NEWLINE2],
|
| - [BODYTEST, SIMPLE, NEWLINE]
|
| - ]
|
| -
|
| - for test in testData:
|
| - jb = amp.Box()
|
| - jb.update(dict(test))
|
| - jb._sendTo(c)
|
| - p.flush()
|
| - self.assertEquals(s.boxes[-1], jb)
|
| -
|
| -
|
| -
|
| -class FakeLocator(object):
|
| - """
|
| - This is a fake implementation of the interface implied by
|
| - L{CommandLocator}.
|
| - """
|
| - def __init__(self):
|
| - """
|
| - Remember the given keyword arguments as a set of responders.
|
| - """
|
| - self.commands = {}
|
| -
|
| -
|
| - def locateResponder(self, commandName):
|
| - """
|
| - Look up and return a function passed as a keyword argument of the given
|
| - name to the constructor.
|
| - """
|
| - return self.commands[commandName]
|
| -
|
| -
|
| -class FakeSender:
|
| - """
|
| - This is a fake implementation of the 'box sender' interface implied by
|
| - L{AMP}.
|
| - """
|
| - def __init__(self):
|
| - """
|
| - Create a fake sender and initialize the list of received boxes and
|
| - unhandled errors.
|
| - """
|
| - self.sentBoxes = []
|
| - self.unhandledErrors = []
|
| - self.expectedErrors = 0
|
| -
|
| -
|
| - def expectError(self):
|
| - """
|
| - Expect one error, so that the test doesn't fail.
|
| - """
|
| - self.expectedErrors += 1
|
| -
|
| -
|
| - def sendBox(self, box):
|
| - """
|
| - Accept a box, but don't do anything.
|
| - """
|
| - self.sentBoxes.append(box)
|
| -
|
| -
|
| - def unhandledError(self, failure):
|
| - """
|
| - Deal with failures by instantly re-raising them for easier debugging.
|
| - """
|
| - self.expectedErrors -= 1
|
| - if self.expectedErrors < 0:
|
| - failure.raiseException()
|
| - else:
|
| - self.unhandledErrors.append(failure)
|
| -
|
| -
|
| -
|
| -class CommandDispatchTests(unittest.TestCase):
|
| - """
|
| - The AMP CommandDispatcher class dispatches converts AMP boxes into commands
|
| - and responses using Command.responder decorator.
|
| -
|
| - Note: Originally, AMP's factoring was such that many tests for this
|
| - functionality are now implemented as full round-trip tests in L{AMPTest}.
|
| - Future tests should be written at this level instead, to ensure API
|
| - compatibility and to provide more granular, readable units of test
|
| - coverage.
|
| - """
|
| -
|
| - def setUp(self):
|
| - """
|
| - Create a dispatcher to use.
|
| - """
|
| - self.locator = FakeLocator()
|
| - self.sender = FakeSender()
|
| - self.dispatcher = amp.BoxDispatcher(self.locator)
|
| - self.dispatcher.startReceivingBoxes(self.sender)
|
| -
|
| -
|
| - def test_receivedAsk(self):
|
| - """
|
| - L{CommandDispatcher.ampBoxReceived} should locate the appropriate
|
| - command in its responder lookup, based on the '_ask' key.
|
| - """
|
| - received = []
|
| - def thunk(box):
|
| - received.append(box)
|
| - return amp.Box({"hello": "goodbye"})
|
| - input = amp.Box(_command="hello",
|
| - _ask="test-command-id",
|
| - hello="world")
|
| - self.locator.commands['hello'] = thunk
|
| - self.dispatcher.ampBoxReceived(input)
|
| - self.assertEquals(received, [input])
|
| -
|
| -
|
| - def test_sendUnhandledError(self):
|
| - """
|
| - L{CommandDispatcher} should relay its unhandled errors in responding to
|
| - boxes to its boxSender.
|
| - """
|
| - err = RuntimeError("something went wrong, oh no")
|
| - self.sender.expectError()
|
| - self.dispatcher.unhandledError(Failure(err))
|
| - self.assertEqual(len(self.sender.unhandledErrors), 1)
|
| - self.assertEqual(self.sender.unhandledErrors[0].value, err)
|
| -
|
| -
|
| - def test_unhandledSerializationError(self):
|
| - """
|
| - Errors during serialization ought to be relayed to the sender's
|
| - unhandledError method.
|
| - """
|
| - err = RuntimeError("something undefined went wrong")
|
| - def thunk(result):
|
| - class BrokenBox(amp.Box):
|
| - def _sendTo(self, proto):
|
| - raise err
|
| - return BrokenBox()
|
| - self.locator.commands['hello'] = thunk
|
| - input = amp.Box(_command="hello",
|
| - _ask="test-command-id",
|
| - hello="world")
|
| - self.sender.expectError()
|
| - self.dispatcher.ampBoxReceived(input)
|
| - self.assertEquals(len(self.sender.unhandledErrors), 1)
|
| - self.assertEquals(self.sender.unhandledErrors[0].value, err)
|
| -
|
| -
|
| - def test_callRemote(self):
|
| - """
|
| - L{CommandDispatcher.callRemote} should emit a properly formatted '_ask'
|
| - box to its boxSender and record an outstanding L{Deferred}. When a
|
| - corresponding '_answer' packet is received, the L{Deferred} should be
|
| - fired, and the results translated via the given L{Command}'s response
|
| - de-serialization.
|
| - """
|
| - D = self.dispatcher.callRemote(Hello, hello='world')
|
| - self.assertEquals(self.sender.sentBoxes,
|
| - [amp.AmpBox(_command="hello",
|
| - _ask="1",
|
| - hello="world")])
|
| - answers = []
|
| - D.addCallback(answers.append)
|
| - self.assertEquals(answers, [])
|
| - self.dispatcher.ampBoxReceived(amp.AmpBox({'hello': "yay",
|
| - 'print': "ignored",
|
| - '_answer': "1"}))
|
| - self.assertEquals(answers, [dict(hello="yay",
|
| - Print=u"ignored")])
|
| -
|
| -
|
| -class SimpleGreeting(amp.Command):
|
| - """
|
| - A very simple greeting command that uses a few basic argument types.
|
| - """
|
| - commandName = 'simple'
|
| - arguments = [('greeting', amp.Unicode()),
|
| - ('cookie', amp.Integer())]
|
| - response = [('cookieplus', amp.Integer())]
|
| -
|
| -
|
| -class TestLocator(amp.CommandLocator):
|
| - """
|
| - A locator which implements a responder to a 'hello' command.
|
| - """
|
| - def __init__(self):
|
| - self.greetings = []
|
| -
|
| -
|
| - def greetingResponder(self, greeting, cookie):
|
| - self.greetings.append((greeting, cookie))
|
| - return dict(cookieplus=cookie + 3)
|
| - greetingResponder = SimpleGreeting.responder(greetingResponder)
|
| -
|
| -
|
| -
|
| -class OverrideLocatorAMP(amp.AMP):
|
| - def __init__(self):
|
| - amp.AMP.__init__(self)
|
| - self.customResponder = object()
|
| - self.expectations = {"custom": self.customResponder}
|
| - self.greetings = []
|
| -
|
| -
|
| - def lookupFunction(self, name):
|
| - """
|
| - Override the deprecated lookupFunction function.
|
| - """
|
| - if name in self.expectations:
|
| - result = self.expectations[name]
|
| - return result
|
| - else:
|
| - return super(OverrideLocatorAMP, self).lookupFunction(name)
|
| -
|
| -
|
| - def greetingResponder(self, greeting, cookie):
|
| - self.greetings.append((greeting, cookie))
|
| - return dict(cookieplus=cookie + 3)
|
| - greetingResponder = SimpleGreeting.responder(greetingResponder)
|
| -
|
| -
|
| -
|
| -
|
| -class CommandLocatorTests(unittest.TestCase):
|
| - """
|
| - The CommandLocator should enable users to specify responders to commands as
|
| - functions that take structured objects, annotated with metadata.
|
| - """
|
| -
|
| - def test_responderDecorator(self):
|
| - """
|
| - A method on a L{CommandLocator} subclass decorated with a L{Command}
|
| - subclass's L{responder} decorator should be returned from
|
| - locateResponder, wrapped in logic to serialize and deserialize its
|
| - arguments.
|
| - """
|
| - locator = TestLocator()
|
| - responderCallable = locator.locateResponder("simple")
|
| - result = responderCallable(amp.Box(greeting="ni hao", cookie="5"))
|
| - def done(values):
|
| - self.assertEquals(values, amp.AmpBox(cookieplus='8'))
|
| - return result.addCallback(done)
|
| -
|
| -
|
| - def test_lookupFunctionDeprecatedOverride(self):
|
| - """
|
| - Subclasses which override locateResponder under its old name,
|
| - lookupFunction, should have the override invoked instead. (This tests
|
| - an AMP subclass, because in the version of the code that could invoke
|
| - this deprecated code path, there was no L{CommandLocator}.)
|
| - """
|
| - locator = OverrideLocatorAMP()
|
| - customResponderObject = self.assertWarns(
|
| - PendingDeprecationWarning,
|
| - "Override locateResponder, not lookupFunction.",
|
| - __file__, lambda : locator.locateResponder("custom"))
|
| - self.assertEquals(locator.customResponder, customResponderObject)
|
| - # Make sure upcalling works too
|
| - normalResponderObject = self.assertWarns(
|
| - PendingDeprecationWarning,
|
| - "Override locateResponder, not lookupFunction.",
|
| - __file__, lambda : locator.locateResponder("simple"))
|
| - result = normalResponderObject(amp.Box(greeting="ni hao", cookie="5"))
|
| - def done(values):
|
| - self.assertEquals(values, amp.AmpBox(cookieplus='8'))
|
| - return result.addCallback(done)
|
| -
|
| -
|
| - def test_lookupFunctionDeprecatedInvoke(self):
|
| - """
|
| - Invoking locateResponder under its old name, lookupFunction, should
|
| - emit a deprecation warning, but do the same thing.
|
| - """
|
| - locator = TestLocator()
|
| - responderCallable = self.assertWarns(
|
| - PendingDeprecationWarning,
|
| - "Call locateResponder, not lookupFunction.", __file__,
|
| - lambda : locator.lookupFunction("simple"))
|
| - result = responderCallable(amp.Box(greeting="ni hao", cookie="5"))
|
| - def done(values):
|
| - self.assertEquals(values, amp.AmpBox(cookieplus='8'))
|
| - return result.addCallback(done)
|
| -
|
| -
|
| -
|
| -SWITCH_CLIENT_DATA = 'Success!'
|
| -SWITCH_SERVER_DATA = 'No, really. Success.'
|
| -
|
| -
|
| -class BinaryProtocolTests(unittest.TestCase):
|
| - """
|
| - Tests for L{amp.BinaryBoxProtocol}.
|
| - """
|
| -
|
| - def setUp(self):
|
| - """
|
| - Keep track of all boxes received by this test in its capacity as an
|
| - L{IBoxReceiver} implementor.
|
| - """
|
| - self.boxes = []
|
| - self.data = []
|
| -
|
| -
|
| - def startReceivingBoxes(self, sender):
|
| - """
|
| - Implement L{IBoxReceiver.startReceivingBoxes} to do nothing.
|
| - """
|
| -
|
| -
|
| - def ampBoxReceived(self, box):
|
| - """
|
| - A box was received by the protocol.
|
| - """
|
| - self.boxes.append(box)
|
| -
|
| - stopReason = None
|
| - def stopReceivingBoxes(self, reason):
|
| - """
|
| - Record the reason that we stopped receiving boxes.
|
| - """
|
| - self.stopReason = reason
|
| -
|
| -
|
| - # fake ITransport
|
| - def getPeer(self):
|
| - return 'no peer'
|
| -
|
| -
|
| - def getHost(self):
|
| - return 'no host'
|
| -
|
| -
|
| - def write(self, data):
|
| - self.data.append(data)
|
| -
|
| -
|
| - def test_receiveBoxStateMachine(self):
|
| - """
|
| - When a binary box protocol receives:
|
| - * a key
|
| - * a value
|
| - * an empty string
|
| - it should emit a box and send it to its boxReceiver.
|
| - """
|
| - a = amp.BinaryBoxProtocol(self)
|
| - a.stringReceived("hello")
|
| - a.stringReceived("world")
|
| - a.stringReceived("")
|
| - self.assertEquals(self.boxes, [amp.AmpBox(hello="world")])
|
| -
|
| -
|
| - def test_receiveBoxData(self):
|
| - """
|
| - When a binary box protocol receives the serialized form of an AMP box,
|
| - it should emit a similar box to its boxReceiver.
|
| - """
|
| - a = amp.BinaryBoxProtocol(self)
|
| - a.dataReceived(amp.Box({"testKey": "valueTest",
|
| - "anotherKey": "anotherValue"}).serialize())
|
| - self.assertEquals(self.boxes,
|
| - [amp.Box({"testKey": "valueTest",
|
| - "anotherKey": "anotherValue"})])
|
| -
|
| -
|
| - def test_sendBox(self):
|
| - """
|
| - When a binary box protocol sends a box, it should emit the serialized
|
| - bytes of that box to its transport.
|
| - """
|
| - a = amp.BinaryBoxProtocol(self)
|
| - a.makeConnection(self)
|
| - aBox = amp.Box({"testKey": "valueTest",
|
| - "someData": "hello"})
|
| - a.makeConnection(self)
|
| - a.sendBox(aBox)
|
| - self.assertEquals(''.join(self.data), aBox.serialize())
|
| -
|
| -
|
| - def test_connectionLostStopSendingBoxes(self):
|
| - """
|
| - When a binary box protocol loses its connection, it should notify its
|
| - box receiver that it has stopped receiving boxes.
|
| - """
|
| - a = amp.BinaryBoxProtocol(self)
|
| - a.makeConnection(self)
|
| - aBox = amp.Box({"sample": "data"})
|
| - a.makeConnection(self)
|
| - connectionFailure = Failure(RuntimeError())
|
| - a.connectionLost(connectionFailure)
|
| - self.assertIdentical(self.stopReason, connectionFailure)
|
| -
|
| -
|
| - def test_protocolSwitch(self):
|
| - """
|
| - L{BinaryBoxProtocol} has the capacity to switch to a different protocol
|
| - on a box boundary. When a protocol is in the process of switching, it
|
| - cannot receive traffic.
|
| - """
|
| - otherProto = TestProto(None, "outgoing data")
|
| - test = self
|
| - class SwitchyReceiver:
|
| - switched = False
|
| - def startReceivingBoxes(self, sender):
|
| - pass
|
| - def ampBoxReceived(self, box):
|
| - test.assertFalse(self.switched,
|
| - "Should only receive one box!")
|
| - self.switched = True
|
| - a._lockForSwitch()
|
| - a._switchTo(otherProto)
|
| - a = amp.BinaryBoxProtocol(SwitchyReceiver())
|
| - anyOldBox = amp.Box({"include": "lots",
|
| - "of": "data"})
|
| - a.makeConnection(self)
|
| - # Include a 0-length box at the beginning of the next protocol's data,
|
| - # to make sure that AMP doesn't eat the data or try to deliver extra
|
| - # boxes either...
|
| - moreThanOneBox = anyOldBox.serialize() + "\x00\x00Hello, world!"
|
| - a.dataReceived(moreThanOneBox)
|
| - self.assertIdentical(otherProto.transport, self)
|
| - self.assertEquals("".join(otherProto.data), "\x00\x00Hello, world!")
|
| - self.assertEquals(self.data, ["outgoing data"])
|
| - a.dataReceived("more data")
|
| - self.assertEquals("".join(otherProto.data),
|
| - "\x00\x00Hello, world!more data")
|
| - self.assertRaises(amp.ProtocolSwitched, a.sendBox, anyOldBox)
|
| -
|
| -
|
| - def test_protocolSwitchInvalidStates(self):
|
| - """
|
| - In order to make sure the protocol never gets any invalid data sent
|
| - into the middle of a box, it must be locked for switching before it is
|
| - switched. It can only be unlocked if the switch failed, and attempting
|
| - to send a box while it is locked should raise an exception.
|
| - """
|
| - a = amp.BinaryBoxProtocol(self)
|
| - a.makeConnection(self)
|
| - sampleBox = amp.Box({"some": "data"})
|
| - a._lockForSwitch()
|
| - self.assertRaises(amp.ProtocolSwitched, a.sendBox, sampleBox)
|
| - a._unlockFromSwitch()
|
| - a.sendBox(sampleBox)
|
| - self.assertEquals(''.join(self.data), sampleBox.serialize())
|
| - a._lockForSwitch()
|
| - otherProto = TestProto(None, "outgoing data")
|
| - a._switchTo(otherProto)
|
| - self.assertRaises(amp.ProtocolSwitched, a._unlockFromSwitch)
|
| -
|
| -
|
| - def test_protocolSwitchLoseConnection(self):
|
| - """
|
| - When the protocol is switched, it should notify its nested protocol of
|
| - disconnection.
|
| - """
|
| - class Loser(protocol.Protocol):
|
| - reason = None
|
| - def connectionLost(self, reason):
|
| - self.reason = reason
|
| - connectionLoser = Loser()
|
| - a = amp.BinaryBoxProtocol(self)
|
| - a.makeConnection(self)
|
| - a._lockForSwitch()
|
| - a._switchTo(connectionLoser)
|
| - connectionFailure = Failure(RuntimeError())
|
| - a.connectionLost(connectionFailure)
|
| - self.assertEquals(connectionLoser.reason, connectionFailure)
|
| -
|
| -
|
| - def test_protocolSwitchLoseClientConnection(self):
|
| - """
|
| - When the protocol is switched, it should notify its nested client
|
| - protocol factory of disconnection.
|
| - """
|
| - class ClientLoser:
|
| - reason = None
|
| - def clientConnectionLost(self, connector, reason):
|
| - self.reason = reason
|
| - a = amp.BinaryBoxProtocol(self)
|
| - connectionLoser = protocol.Protocol()
|
| - clientLoser = ClientLoser()
|
| - a.makeConnection(self)
|
| - a._lockForSwitch()
|
| - a._switchTo(connectionLoser, clientLoser)
|
| - connectionFailure = Failure(RuntimeError())
|
| - a.connectionLost(connectionFailure)
|
| - self.assertEquals(clientLoser.reason, connectionFailure)
|
| -
|
| -
|
| -
|
| -class AMPTest(unittest.TestCase):
|
| -
|
| - def test_interfaceDeclarations(self):
|
| - """
|
| - The classes in the amp module ought to implement the interfaces that
|
| - are declared for their benefit.
|
| - """
|
| - for interface, implementation in [(amp.IBoxSender, amp.BinaryBoxProtocol),
|
| - (amp.IBoxReceiver, amp.BoxDispatcher),
|
| - (amp.IResponderLocator, amp.CommandLocator),
|
| - (amp.IResponderLocator, amp.SimpleStringLocator),
|
| - (amp.IBoxSender, amp.AMP),
|
| - (amp.IBoxReceiver, amp.AMP),
|
| - (amp.IResponderLocator, amp.AMP)]:
|
| - self.failUnless(interface.implementedBy(implementation),
|
| - "%s does not implements(%s)" % (implementation, interface))
|
| -
|
| -
|
| - def test_helloWorld(self):
|
| - """
|
| - Verify that a simple command can be sent and its response received with
|
| - the simple low-level string-based API.
|
| - """
|
| - c, s, p = connectedServerAndClient()
|
| - L = []
|
| - HELLO = 'world'
|
| - c.sendHello(HELLO).addCallback(L.append)
|
| - p.flush()
|
| - self.assertEquals(L[0]['hello'], HELLO)
|
| -
|
| -
|
| - def test_wireFormatRoundTrip(self):
|
| - """
|
| - Verify that mixed-case, underscored and dashed arguments are mapped to
|
| - their python names properly.
|
| - """
|
| - c, s, p = connectedServerAndClient()
|
| - L = []
|
| - HELLO = 'world'
|
| - c.sendHello(HELLO).addCallback(L.append)
|
| - p.flush()
|
| - self.assertEquals(L[0]['hello'], HELLO)
|
| -
|
| -
|
| - def test_helloWorldUnicode(self):
|
| - """
|
| - Verify that unicode arguments can be encoded and decoded.
|
| - """
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=SimpleSymmetricCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| - L = []
|
| - HELLO = 'world'
|
| - HELLO_UNICODE = 'wor\u1234ld'
|
| - c.sendUnicodeHello(HELLO, HELLO_UNICODE).addCallback(L.append)
|
| - p.flush()
|
| - self.assertEquals(L[0]['hello'], HELLO)
|
| - self.assertEquals(L[0]['Print'], HELLO_UNICODE)
|
| -
|
| -
|
| - def test_unknownCommandLow(self):
|
| - """
|
| - Verify that unknown commands using low-level APIs will be rejected with an
|
| - error, but will NOT terminate the connection.
|
| - """
|
| - c, s, p = connectedServerAndClient()
|
| - L = []
|
| - def clearAndAdd(e):
|
| - """
|
| - You can't propagate the error...
|
| - """
|
| - e.trap(amp.UnhandledCommand)
|
| - return "OK"
|
| - c.callRemoteString("WTF").addErrback(clearAndAdd).addCallback(L.append)
|
| - p.flush()
|
| - self.assertEquals(L.pop(), "OK")
|
| - HELLO = 'world'
|
| - c.sendHello(HELLO).addCallback(L.append)
|
| - p.flush()
|
| - self.assertEquals(L[0]['hello'], HELLO)
|
| -
|
| -
|
| - def test_unknownCommandHigh(self):
|
| - """
|
| - Verify that unknown commands using high-level APIs will be rejected with an
|
| - error, but will NOT terminate the connection.
|
| - """
|
| - c, s, p = connectedServerAndClient()
|
| - L = []
|
| - def clearAndAdd(e):
|
| - """
|
| - You can't propagate the error...
|
| - """
|
| - e.trap(amp.UnhandledCommand)
|
| - return "OK"
|
| - c.callRemote(WTF).addErrback(clearAndAdd).addCallback(L.append)
|
| - p.flush()
|
| - self.assertEquals(L.pop(), "OK")
|
| - HELLO = 'world'
|
| - c.sendHello(HELLO).addCallback(L.append)
|
| - p.flush()
|
| - self.assertEquals(L[0]['hello'], HELLO)
|
| -
|
| -
|
| - def test_brokenReturnValue(self):
|
| - """
|
| - It can be very confusing if you write some code which responds to a
|
| - command, but gets the return value wrong. Most commonly you end up
|
| - returning None instead of a dictionary.
|
| -
|
| - Verify that if that happens, the framework logs a useful error.
|
| - """
|
| - L = []
|
| - SimpleSymmetricCommandProtocol().dispatchCommand(
|
| - amp.AmpBox(_command=BrokenReturn.commandName)).addErrback(L.append)
|
| - blr = L[0].trap(amp.BadLocalReturn)
|
| - self.failUnlessIn('None', repr(L[0].value))
|
| -
|
| -
|
| -
|
| - def test_unknownArgument(self):
|
| - """
|
| - Verify that unknown arguments are ignored, and not passed to a Python
|
| - function which can't accept them.
|
| - """
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=SimpleSymmetricCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| - L = []
|
| - HELLO = 'world'
|
| - # c.sendHello(HELLO).addCallback(L.append)
|
| - c.callRemote(FutureHello,
|
| - hello=HELLO,
|
| - bonus="I'm not in the book!").addCallback(
|
| - L.append)
|
| - p.flush()
|
| - self.assertEquals(L[0]['hello'], HELLO)
|
| -
|
| -
|
| - def test_simpleReprs(self):
|
| - """
|
| - Verify that the various Box objects repr properly, for debugging.
|
| - """
|
| - self.assertEquals(type(repr(amp._TLSBox())), str)
|
| - self.assertEquals(type(repr(amp._SwitchBox('a'))), str)
|
| - self.assertEquals(type(repr(amp.QuitBox())), str)
|
| - self.assertEquals(type(repr(amp.AmpBox())), str)
|
| - self.failUnless("AmpBox" in repr(amp.AmpBox()))
|
| -
|
| - def test_keyTooLong(self):
|
| - """
|
| - Verify that a key that is too long will immediately raise a synchronous
|
| - exception.
|
| - """
|
| - c, s, p = connectedServerAndClient()
|
| - L = []
|
| - x = "H" * (0xff+1)
|
| - tl = self.assertRaises(amp.TooLong,
|
| - c.callRemoteString, "Hello",
|
| - **{x: "hi"})
|
| - self.failUnless(tl.isKey)
|
| - self.failUnless(tl.isLocal)
|
| - self.failUnlessIdentical(tl.keyName, None)
|
| - self.failUnlessIdentical(tl.value, x)
|
| - self.failUnless(str(len(x)) in repr(tl))
|
| - self.failUnless("key" in repr(tl))
|
| -
|
| -
|
| - def test_valueTooLong(self):
|
| - """
|
| - Verify that attempting to send value longer than 64k will immediately
|
| - raise an exception.
|
| - """
|
| - c, s, p = connectedServerAndClient()
|
| - L = []
|
| - x = "H" * (0xffff+1)
|
| - tl = self.assertRaises(amp.TooLong, c.sendHello, x)
|
| - p.flush()
|
| - self.failIf(tl.isKey)
|
| - self.failUnless(tl.isLocal)
|
| - self.failUnlessIdentical(tl.keyName, 'hello')
|
| - self.failUnlessIdentical(tl.value, x)
|
| - self.failUnless(str(len(x)) in repr(tl))
|
| - self.failUnless("value" in repr(tl))
|
| - self.failUnless('hello' in repr(tl))
|
| -
|
| -
|
| - def test_helloWorldCommand(self):
|
| - """
|
| - Verify that a simple command can be sent and its response received with
|
| - the high-level value parsing API.
|
| - """
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=SimpleSymmetricCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| - L = []
|
| - HELLO = 'world'
|
| - c.sendHello(HELLO).addCallback(L.append)
|
| - p.flush()
|
| - self.assertEquals(L[0]['hello'], HELLO)
|
| -
|
| -
|
| - def test_helloErrorHandling(self):
|
| - """
|
| - Verify that if a known error type is raised and handled, it will be
|
| - properly relayed to the other end of the connection and translated into
|
| - an exception, and no error will be logged.
|
| - """
|
| - L=[]
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=SimpleSymmetricCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| - HELLO = 'fuck you'
|
| - c.sendHello(HELLO).addErrback(L.append)
|
| - p.flush()
|
| - L[0].trap(UnfriendlyGreeting)
|
| - self.assertEquals(str(L[0].value), "Don't be a dick.")
|
| -
|
| -
|
| - def test_helloFatalErrorHandling(self):
|
| - """
|
| - Verify that if a known, fatal error type is raised and handled, it will
|
| - be properly relayed to the other end of the connection and translated
|
| - into an exception, no error will be logged, and the connection will be
|
| - terminated.
|
| - """
|
| - L=[]
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=SimpleSymmetricCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| - HELLO = 'die'
|
| - c.sendHello(HELLO).addErrback(L.append)
|
| - p.flush()
|
| - L.pop().trap(DeathThreat)
|
| - c.sendHello(HELLO).addErrback(L.append)
|
| - p.flush()
|
| - L.pop().trap(error.ConnectionDone)
|
| -
|
| -
|
| -
|
| - def test_helloNoErrorHandling(self):
|
| - """
|
| - Verify that if an unknown error type is raised, it will be relayed to
|
| - the other end of the connection and translated into an exception, it
|
| - will be logged, and then the connection will be dropped.
|
| - """
|
| - L=[]
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=SimpleSymmetricCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| - HELLO = THING_I_DONT_UNDERSTAND
|
| - c.sendHello(HELLO).addErrback(L.append)
|
| - p.flush()
|
| - ure = L.pop()
|
| - ure.trap(amp.UnknownRemoteError)
|
| - c.sendHello(HELLO).addErrback(L.append)
|
| - cl = L.pop()
|
| - cl.trap(error.ConnectionDone)
|
| - # The exception should have been logged.
|
| - self.failUnless(self.flushLoggedErrors(ThingIDontUnderstandError))
|
| -
|
| -
|
| -
|
| - def test_lateAnswer(self):
|
| - """
|
| - Verify that a command that does not get answered until after the
|
| - connection terminates will not cause any errors.
|
| - """
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=SimpleSymmetricCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| - L = []
|
| - HELLO = 'world'
|
| - c.callRemote(WaitForever).addErrback(L.append)
|
| - p.flush()
|
| - self.assertEquals(L, [])
|
| - s.transport.loseConnection()
|
| - p.flush()
|
| - L.pop().trap(error.ConnectionDone)
|
| - # Just make sure that it doesn't error...
|
| - s.waiting.callback({})
|
| - return s.waiting
|
| -
|
| -
|
| - def test_requiresNoAnswer(self):
|
| - """
|
| - Verify that a command that requires no answer is run.
|
| - """
|
| - L=[]
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=SimpleSymmetricCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| - HELLO = 'world'
|
| - c.callRemote(NoAnswerHello, hello=HELLO)
|
| - p.flush()
|
| - self.failUnless(s.greeted)
|
| -
|
| -
|
| - def test_requiresNoAnswerFail(self):
|
| - """
|
| - Verify that commands sent after a failed no-answer request do not complete.
|
| - """
|
| - L=[]
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=SimpleSymmetricCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| - HELLO = 'fuck you'
|
| - c.callRemote(NoAnswerHello, hello=HELLO)
|
| - p.flush()
|
| - # This should be logged locally.
|
| - self.failUnless(self.flushLoggedErrors(amp.RemoteAmpError))
|
| - HELLO = 'world'
|
| - c.callRemote(Hello, hello=HELLO).addErrback(L.append)
|
| - p.flush()
|
| - L.pop().trap(error.ConnectionDone)
|
| - self.failIf(s.greeted)
|
| -
|
| -
|
| - def test_noAnswerResponderBadAnswer(self):
|
| - """
|
| - Verify that responders of requiresAnswer=False commands have to return
|
| - a dictionary anyway.
|
| -
|
| - (requiresAnswer is a hint from the _client_ - the server may be called
|
| - upon to answer commands in any case, if the client wants to know when
|
| - they complete.)
|
| - """
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=BadNoAnswerCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| - c.callRemote(NoAnswerHello, hello="hello")
|
| - p.flush()
|
| - le = self.flushLoggedErrors(amp.BadLocalReturn)
|
| - self.assertEquals(len(le), 1)
|
| -
|
| -
|
| - def test_noAnswerResponderAskedForAnswer(self):
|
| - """
|
| - Verify that responders with requiresAnswer=False will actually respond
|
| - if the client sets requiresAnswer=True. In other words, verify that
|
| - requiresAnswer is a hint honored only by the client.
|
| - """
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=NoAnswerCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| - L = []
|
| - c.callRemote(Hello, hello="Hello!").addCallback(L.append)
|
| - p.flush()
|
| - self.assertEquals(len(L), 1)
|
| - self.assertEquals(L, [dict(hello="Hello!-noanswer",
|
| - Print=None)]) # Optional response argument
|
| -
|
| -
|
| - def test_ampListCommand(self):
|
| - """
|
| - Test encoding of an argument that uses the AmpList encoding.
|
| - """
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=SimpleSymmetricCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| - L = []
|
| - c.callRemote(GetList, length=10).addCallback(L.append)
|
| - p.flush()
|
| - values = L.pop().get('body')
|
| - self.assertEquals(values, [{'x': 1}] * 10)
|
| -
|
| -
|
| - def test_failEarlyOnArgSending(self):
|
| - """
|
| - Verify that if we pass an invalid argument list (omitting an argument), an
|
| - exception will be raised.
|
| - """
|
| - okayCommand = Hello(hello="What?")
|
| - self.assertRaises(amp.InvalidSignature, Hello)
|
| -
|
| -
|
| - def test_doubleProtocolSwitch(self):
|
| - """
|
| - As a debugging aid, a protocol system should raise a
|
| - L{ProtocolSwitched} exception when asked to switch a protocol that is
|
| - already switched.
|
| - """
|
| - serverDeferred = defer.Deferred()
|
| - serverProto = SimpleSymmetricCommandProtocol(serverDeferred)
|
| - clientDeferred = defer.Deferred()
|
| - clientProto = SimpleSymmetricCommandProtocol(clientDeferred)
|
| - c, s, p = connectedServerAndClient(ServerClass=lambda: serverProto,
|
| - ClientClass=lambda: clientProto)
|
| - def switched(result):
|
| - self.assertRaises(amp.ProtocolSwitched, c.switchToTestProtocol)
|
| - self.testSucceeded = True
|
| - c.switchToTestProtocol().addCallback(switched)
|
| - p.flush()
|
| - self.failUnless(self.testSucceeded)
|
| -
|
| -
|
| - def test_protocolSwitch(self, switcher=SimpleSymmetricCommandProtocol,
|
| - spuriousTraffic=False,
|
| - spuriousError=False):
|
| - """
|
| - Verify that it is possible to switch to another protocol mid-connection and
|
| - send data to it successfully.
|
| - """
|
| - self.testSucceeded = False
|
| -
|
| - serverDeferred = defer.Deferred()
|
| - serverProto = switcher(serverDeferred)
|
| - clientDeferred = defer.Deferred()
|
| - clientProto = switcher(clientDeferred)
|
| - c, s, p = connectedServerAndClient(ServerClass=lambda: serverProto,
|
| - ClientClass=lambda: clientProto)
|
| -
|
| - if spuriousTraffic:
|
| - wfdr = [] # remote
|
| - wfd = c.callRemote(WaitForever).addErrback(wfdr.append)
|
| - switchDeferred = c.switchToTestProtocol()
|
| - if spuriousTraffic:
|
| - self.assertRaises(amp.ProtocolSwitched, c.sendHello, 'world')
|
| -
|
| - def cbConnsLost(((serverSuccess, serverData),
|
| - (clientSuccess, clientData))):
|
| - self.failUnless(serverSuccess)
|
| - self.failUnless(clientSuccess)
|
| - self.assertEquals(''.join(serverData), SWITCH_CLIENT_DATA)
|
| - self.assertEquals(''.join(clientData), SWITCH_SERVER_DATA)
|
| - self.testSucceeded = True
|
| -
|
| - def cbSwitch(proto):
|
| - return defer.DeferredList(
|
| - [serverDeferred, clientDeferred]).addCallback(cbConnsLost)
|
| -
|
| - switchDeferred.addCallback(cbSwitch)
|
| - p.flush()
|
| - if serverProto.maybeLater is not None:
|
| - serverProto.maybeLater.callback(serverProto.maybeLaterProto)
|
| - p.flush()
|
| - if spuriousTraffic:
|
| - # switch is done here; do this here to make sure that if we're
|
| - # going to corrupt the connection, we do it before it's closed.
|
| - if spuriousError:
|
| - s.waiting.errback(amp.RemoteAmpError(
|
| - "SPURIOUS",
|
| - "Here's some traffic in the form of an error."))
|
| - else:
|
| - s.waiting.callback({})
|
| - p.flush()
|
| - c.transport.loseConnection() # close it
|
| - p.flush()
|
| - self.failUnless(self.testSucceeded)
|
| -
|
| -
|
| - def test_protocolSwitchDeferred(self):
|
| - """
|
| - Verify that protocol-switching even works if the value returned from
|
| - the command that does the switch is deferred.
|
| - """
|
| - return self.test_protocolSwitch(switcher=DeferredSymmetricCommandProtocol)
|
| -
|
| -
|
| - def test_protocolSwitchFail(self, switcher=SimpleSymmetricCommandProtocol):
|
| - """
|
| - Verify that if we try to switch protocols and it fails, the connection
|
| - stays up and we can go back to speaking AMP.
|
| - """
|
| - self.testSucceeded = False
|
| -
|
| - serverDeferred = defer.Deferred()
|
| - serverProto = switcher(serverDeferred)
|
| - clientDeferred = defer.Deferred()
|
| - clientProto = switcher(clientDeferred)
|
| - c, s, p = connectedServerAndClient(ServerClass=lambda: serverProto,
|
| - ClientClass=lambda: clientProto)
|
| - L = []
|
| - switchDeferred = c.switchToTestProtocol(fail=True).addErrback(L.append)
|
| - p.flush()
|
| - L.pop().trap(UnknownProtocol)
|
| - self.failIf(self.testSucceeded)
|
| - # It's a known error, so let's send a "hello" on the same connection;
|
| - # it should work.
|
| - c.sendHello('world').addCallback(L.append)
|
| - p.flush()
|
| - self.assertEqual(L.pop()['hello'], 'world')
|
| -
|
| -
|
| - def test_trafficAfterSwitch(self):
|
| - """
|
| - Verify that attempts to send traffic after a switch will not corrupt
|
| - the nested protocol.
|
| - """
|
| - return self.test_protocolSwitch(spuriousTraffic=True)
|
| -
|
| -
|
| - def test_errorAfterSwitch(self):
|
| - """
|
| - Returning an error after a protocol switch should record the underlying
|
| - error.
|
| - """
|
| - return self.test_protocolSwitch(spuriousTraffic=True,
|
| - spuriousError=True)
|
| -
|
| -
|
| - def test_quitBoxQuits(self):
|
| - """
|
| - Verify that commands with a responseType of QuitBox will in fact
|
| - terminate the connection.
|
| - """
|
| - c, s, p = connectedServerAndClient(
|
| - ServerClass=SimpleSymmetricCommandProtocol,
|
| - ClientClass=SimpleSymmetricCommandProtocol)
|
| -
|
| - L = []
|
| - HELLO = 'world'
|
| - GOODBYE = 'everyone'
|
| - c.sendHello(HELLO).addCallback(L.append)
|
| - p.flush()
|
| - self.assertEquals(L.pop()['hello'], HELLO)
|
| - c.callRemote(Goodbye).addCallback(L.append)
|
| - p.flush()
|
| - self.assertEquals(L.pop()['goodbye'], GOODBYE)
|
| - c.sendHello(HELLO).addErrback(L.append)
|
| - L.pop().trap(error.ConnectionDone)
|
| -
|
| -
|
| - def test_basicLiteralEmit(self):
|
| - """
|
| - Verify that the command dictionaries for a callRemoteN look correct
|
| - after being serialized and parsed.
|
| - """
|
| - c, s, p = connectedServerAndClient()
|
| - L = []
|
| - s.ampBoxReceived = L.append
|
| - c.callRemote(Hello, hello='hello test', mixedCase='mixed case arg test',
|
| - dash_arg='x', underscore_arg='y')
|
| - p.flush()
|
| - self.assertEquals(len(L), 1)
|
| - for k, v in [('_command', Hello.commandName),
|
| - ('hello', 'hello test'),
|
| - ('mixedCase', 'mixed case arg test'),
|
| - ('dash-arg', 'x'),
|
| - ('underscore_arg', 'y')]:
|
| - self.assertEquals(L[-1].pop(k), v)
|
| - L[-1].pop('_ask')
|
| - self.assertEquals(L[-1], {})
|
| -
|
| -
|
| - def test_basicStructuredEmit(self):
|
| - """
|
| - Verify that a call similar to basicLiteralEmit's is handled properly with
|
| - high-level quoting and passing to Python methods, and that argument
|
| - names are correctly handled.
|
| - """
|
| - L = []
|
| - class StructuredHello(amp.AMP):
|
| - def h(self, *a, **k):
|
| - L.append((a, k))
|
| - return dict(hello='aaa')
|
| - Hello.responder(h)
|
| - c, s, p = connectedServerAndClient(ServerClass=StructuredHello)
|
| - c.callRemote(Hello, hello='hello test', mixedCase='mixed case arg test',
|
| - dash_arg='x', underscore_arg='y').addCallback(L.append)
|
| - p.flush()
|
| - self.assertEquals(len(L), 2)
|
| - self.assertEquals(L[0],
|
| - ((), dict(
|
| - hello='hello test',
|
| - mixedCase='mixed case arg test',
|
| - dash_arg='x',
|
| - underscore_arg='y',
|
| -
|
| - # XXX - should optional arguments just not be passed?
|
| - # passing None seems a little odd, looking at the way it
|
| - # turns out here... -glyph
|
| - From=('file', 'file'),
|
| - Print=None,
|
| - optional=None,
|
| - )))
|
| - self.assertEquals(L[1], dict(Print=None, hello='aaa'))
|
| -
|
| -class PretendRemoteCertificateAuthority:
|
| - def checkIsPretendRemote(self):
|
| - return True
|
| -
|
| -class IOSimCert:
|
| - verifyCount = 0
|
| -
|
| - def options(self, *ign):
|
| - return self
|
| -
|
| - def iosimVerify(self, otherCert):
|
| - """
|
| - This isn't a real certificate, and wouldn't work on a real socket, but
|
| - iosim specifies a different API so that we don't have to do any crypto
|
| - math to demonstrate that the right functions get called in the right
|
| - places.
|
| - """
|
| - assert otherCert is self
|
| - self.verifyCount += 1
|
| - return True
|
| -
|
| -class OKCert(IOSimCert):
|
| - def options(self, x):
|
| - assert x.checkIsPretendRemote()
|
| - return self
|
| -
|
| -class GrumpyCert(IOSimCert):
|
| - def iosimVerify(self, otherCert):
|
| - self.verifyCount += 1
|
| - return False
|
| -
|
| -class DroppyCert(IOSimCert):
|
| - def __init__(self, toDrop):
|
| - self.toDrop = toDrop
|
| -
|
| - def iosimVerify(self, otherCert):
|
| - self.verifyCount += 1
|
| - self.toDrop.loseConnection()
|
| - return True
|
| -
|
| -class SecurableProto(FactoryNotifier):
|
| -
|
| - factory = None
|
| -
|
| - def verifyFactory(self):
|
| - return [PretendRemoteCertificateAuthority()]
|
| -
|
| - def getTLSVars(self):
|
| - cert = self.certFactory()
|
| - verify = self.verifyFactory()
|
| - return dict(
|
| - tls_localCertificate=cert,
|
| - tls_verifyAuthorities=verify)
|
| - amp.StartTLS.responder(getTLSVars)
|
| -
|
| -
|
| -
|
| -class TLSTest(unittest.TestCase):
|
| - def test_startingTLS(self):
|
| - """
|
| - Verify that starting TLS and succeeding at handshaking sends all the
|
| - notifications to all the right places.
|
| - """
|
| - cli, svr, p = connectedServerAndClient(
|
| - ServerClass=SecurableProto,
|
| - ClientClass=SecurableProto)
|
| -
|
| - okc = OKCert()
|
| - svr.certFactory = lambda : okc
|
| -
|
| - cli.callRemote(
|
| - amp.StartTLS, tls_localCertificate=okc,
|
| - tls_verifyAuthorities=[PretendRemoteCertificateAuthority()])
|
| -
|
| - # let's buffer something to be delivered securely
|
| - L = []
|
| - d = cli.callRemote(SecuredPing).addCallback(L.append)
|
| - p.flush()
|
| - # once for client once for server
|
| - self.assertEquals(okc.verifyCount, 2)
|
| - L = []
|
| - d = cli.callRemote(SecuredPing).addCallback(L.append)
|
| - p.flush()
|
| - self.assertEqual(L[0], {'pinged': True})
|
| -
|
| -
|
| - def test_startTooManyTimes(self):
|
| - """
|
| - Verify that the protocol will complain if we attempt to renegotiate TLS,
|
| - which we don't support.
|
| - """
|
| - cli, svr, p = connectedServerAndClient(
|
| - ServerClass=SecurableProto,
|
| - ClientClass=SecurableProto)
|
| -
|
| - okc = OKCert()
|
| - svr.certFactory = lambda : okc
|
| -
|
| - cli.callRemote(amp.StartTLS,
|
| - tls_localCertificate=okc,
|
| - tls_verifyAuthorities=[PretendRemoteCertificateAuthority()])
|
| - p.flush()
|
| - cli.noPeerCertificate = True # this is totally fake
|
| - self.assertRaises(
|
| - amp.OnlyOneTLS,
|
| - cli.callRemote,
|
| - amp.StartTLS,
|
| - tls_localCertificate=okc,
|
| - tls_verifyAuthorities=[PretendRemoteCertificateAuthority()])
|
| -
|
| -
|
| - def test_negotiationFailed(self):
|
| - """
|
| - Verify that starting TLS and failing on both sides at handshaking sends
|
| - notifications to all the right places and terminates the connection.
|
| - """
|
| -
|
| - badCert = GrumpyCert()
|
| -
|
| - cli, svr, p = connectedServerAndClient(
|
| - ServerClass=SecurableProto,
|
| - ClientClass=SecurableProto)
|
| - svr.certFactory = lambda : badCert
|
| -
|
| - cli.callRemote(amp.StartTLS,
|
| - tls_localCertificate=badCert)
|
| -
|
| - p.flush()
|
| - # once for client once for server - but both fail
|
| - self.assertEquals(badCert.verifyCount, 2)
|
| - d = cli.callRemote(SecuredPing)
|
| - p.flush()
|
| - self.assertFailure(d, iosim.OpenSSLVerifyError)
|
| -
|
| -
|
| - def test_negotiationFailedByClosing(self):
|
| - """
|
| - Verify that starting TLS and failing by way of a lost connection
|
| - notices that it is probably an SSL problem.
|
| - """
|
| -
|
| - cli, svr, p = connectedServerAndClient(
|
| - ServerClass=SecurableProto,
|
| - ClientClass=SecurableProto)
|
| - droppyCert = DroppyCert(svr.transport)
|
| - svr.certFactory = lambda : droppyCert
|
| -
|
| - secure = cli.callRemote(amp.StartTLS,
|
| - tls_localCertificate=droppyCert)
|
| -
|
| - p.flush()
|
| -
|
| - self.assertEquals(droppyCert.verifyCount, 2)
|
| -
|
| - d = cli.callRemote(SecuredPing)
|
| - p.flush()
|
| -
|
| - # it might be a good idea to move this exception somewhere more
|
| - # reasonable.
|
| - self.assertFailure(d, error.PeerVerifyError)
|
| -
|
| -
|
| -
|
| -class InheritedError(Exception):
|
| - """
|
| - This error is used to check inheritance.
|
| - """
|
| -
|
| -
|
| -
|
| -class OtherInheritedError(Exception):
|
| - """
|
| - This is a distinct error for checking inheritance.
|
| - """
|
| -
|
| -
|
| -
|
| -class BaseCommand(amp.Command):
|
| - """
|
| - This provides a command that will be subclassed.
|
| - """
|
| - errors = {InheritedError: 'INHERITED_ERROR'}
|
| -
|
| -
|
| -
|
| -class InheritedCommand(BaseCommand):
|
| - """
|
| - This is a command which subclasses another command but does not override
|
| - anything.
|
| - """
|
| -
|
| -
|
| -
|
| -class AddErrorsCommand(BaseCommand):
|
| - """
|
| - This is a command which subclasses another command but adds errors to the
|
| - list.
|
| - """
|
| - arguments = [('other', amp.Boolean())]
|
| - errors = {OtherInheritedError: 'OTHER_INHERITED_ERROR'}
|
| -
|
| -
|
| -
|
| -class NormalCommandProtocol(amp.AMP):
|
| - """
|
| - This is a protocol which responds to L{BaseCommand}, and is used to test
|
| - that inheritance does not interfere with the normal handling of errors.
|
| - """
|
| - def resp(self):
|
| - raise InheritedError()
|
| - BaseCommand.responder(resp)
|
| -
|
| -
|
| -
|
| -class InheritedCommandProtocol(amp.AMP):
|
| - """
|
| - This is a protocol which responds to L{InheritedCommand}, and is used to
|
| - test that inherited commands inherit their bases' errors if they do not
|
| - respond to any of their own.
|
| - """
|
| - def resp(self):
|
| - raise InheritedError()
|
| - InheritedCommand.responder(resp)
|
| -
|
| -
|
| -
|
| -class AddedCommandProtocol(amp.AMP):
|
| - """
|
| - This is a protocol which responds to L{AddErrorsCommand}, and is used to
|
| - test that inherited commands can add their own new types of errors, but
|
| - still respond in the same way to their parents types of errors.
|
| - """
|
| - def resp(self, other):
|
| - if other:
|
| - raise OtherInheritedError()
|
| - else:
|
| - raise InheritedError()
|
| - AddErrorsCommand.responder(resp)
|
| -
|
| -
|
| -
|
| -class CommandInheritanceTests(unittest.TestCase):
|
| - """
|
| - These tests verify that commands inherit error conditions properly.
|
| - """
|
| -
|
| - def errorCheck(self, err, proto, cmd, **kw):
|
| - """
|
| - Check that the appropriate kind of error is raised when a given command
|
| - is sent to a given protocol.
|
| - """
|
| - c, s, p = connectedServerAndClient(ServerClass=proto,
|
| - ClientClass=proto)
|
| - d = c.callRemote(cmd, **kw)
|
| - d2 = self.failUnlessFailure(d, err)
|
| - p.flush()
|
| - return d2
|
| -
|
| -
|
| - def test_basicErrorPropagation(self):
|
| - """
|
| - Verify that errors specified in a superclass are respected normally
|
| - even if it has subclasses.
|
| - """
|
| - return self.errorCheck(
|
| - InheritedError, NormalCommandProtocol, BaseCommand)
|
| -
|
| -
|
| - def test_inheritedErrorPropagation(self):
|
| - """
|
| - Verify that errors specified in a superclass command are propagated to
|
| - its subclasses.
|
| - """
|
| - return self.errorCheck(
|
| - InheritedError, InheritedCommandProtocol, InheritedCommand)
|
| -
|
| -
|
| - def test_inheritedErrorAddition(self):
|
| - """
|
| - Verify that new errors specified in a subclass of an existing command
|
| - are honored even if the superclass defines some errors.
|
| - """
|
| - return self.errorCheck(
|
| - OtherInheritedError, AddedCommandProtocol, AddErrorsCommand, other=True)
|
| -
|
| -
|
| - def test_additionWithOriginalError(self):
|
| - """
|
| - Verify that errors specified in a command's superclass are respected
|
| - even if that command defines new errors itself.
|
| - """
|
| - return self.errorCheck(
|
| - InheritedError, AddedCommandProtocol, AddErrorsCommand, other=False)
|
| -
|
| -
|
| -def _loseAndPass(err, proto):
|
| - # be specific, pass on the error to the client.
|
| - err.trap(error.ConnectionLost, error.ConnectionDone)
|
| - del proto.connectionLost
|
| - proto.connectionLost(err)
|
| -
|
| -
|
| -class LiveFireBase:
|
| - """
|
| - Utility for connected reactor-using tests.
|
| - """
|
| -
|
| - def setUp(self):
|
| - """
|
| - Create an amp server and connect a client to it.
|
| - """
|
| - from twisted.internet import reactor
|
| - self.serverFactory = protocol.ServerFactory()
|
| - self.serverFactory.protocol = self.serverProto
|
| - self.clientFactory = protocol.ClientFactory()
|
| - self.clientFactory.protocol = self.clientProto
|
| - self.clientFactory.onMade = defer.Deferred()
|
| - self.serverFactory.onMade = defer.Deferred()
|
| - self.serverPort = reactor.listenTCP(0, self.serverFactory)
|
| - self.addCleanup(self.serverPort.stopListening)
|
| - self.clientConn = reactor.connectTCP(
|
| - '127.0.0.1', self.serverPort.getHost().port,
|
| - self.clientFactory)
|
| - self.addCleanup(self.clientConn.disconnect)
|
| - def getProtos(rlst):
|
| - self.cli = self.clientFactory.theProto
|
| - self.svr = self.serverFactory.theProto
|
| - dl = defer.DeferredList([self.clientFactory.onMade,
|
| - self.serverFactory.onMade])
|
| - return dl.addCallback(getProtos)
|
| -
|
| - def tearDown(self):
|
| - """
|
| - Cleanup client and server connections, and check the error got at
|
| - C{connectionLost}.
|
| - """
|
| - L = []
|
| - for conn in self.cli, self.svr:
|
| - if conn.transport is not None:
|
| - # depend on amp's function connection-dropping behavior
|
| - d = defer.Deferred().addErrback(_loseAndPass, conn)
|
| - conn.connectionLost = d.errback
|
| - conn.transport.loseConnection()
|
| - L.append(d)
|
| - return defer.gatherResults(L
|
| - ).addErrback(lambda first: first.value.subFailure)
|
| -
|
| -
|
| -def show(x):
|
| - import sys
|
| - sys.stdout.write(x+'\n')
|
| - sys.stdout.flush()
|
| -
|
| -
|
| -def tempSelfSigned():
|
| - from twisted.internet import ssl
|
| -
|
| - sharedDN = ssl.DN(CN='shared')
|
| - key = ssl.KeyPair.generate()
|
| - cr = key.certificateRequest(sharedDN)
|
| - sscrd = key.signCertificateRequest(
|
| - sharedDN, cr, lambda dn: True, 1234567)
|
| - cert = key.newCertificate(sscrd)
|
| - return cert
|
| -
|
| -tempcert = tempSelfSigned()
|
| -
|
| -
|
| -class LiveFireTLSTestCase(LiveFireBase, unittest.TestCase):
|
| - clientProto = SecurableProto
|
| - serverProto = SecurableProto
|
| - def test_liveFireCustomTLS(self):
|
| - """
|
| - Using real, live TLS, actually negotiate a connection.
|
| -
|
| - This also looks at the 'peerCertificate' attribute's correctness, since
|
| - that's actually loaded using OpenSSL calls, but the main purpose is to
|
| - make sure that we didn't miss anything obvious in iosim about TLS
|
| - negotiations.
|
| - """
|
| -
|
| - cert = tempcert
|
| -
|
| - self.svr.verifyFactory = lambda : [cert]
|
| - self.svr.certFactory = lambda : cert
|
| - # only needed on the server, we specify the client below.
|
| -
|
| - def secured(rslt):
|
| - x = cert.digest()
|
| - def pinged(rslt2):
|
| - # Interesting. OpenSSL won't even _tell_ us about the peer
|
| - # cert until we negotiate. we should be able to do this in
|
| - # 'secured' instead, but it looks like we can't. I think this
|
| - # is a bug somewhere far deeper than here.
|
| - self.failUnlessEqual(x, self.cli.hostCertificate.digest())
|
| - self.failUnlessEqual(x, self.cli.peerCertificate.digest())
|
| - self.failUnlessEqual(x, self.svr.hostCertificate.digest())
|
| - self.failUnlessEqual(x, self.svr.peerCertificate.digest())
|
| - return self.cli.callRemote(SecuredPing).addCallback(pinged)
|
| - return self.cli.callRemote(amp.StartTLS,
|
| - tls_localCertificate=cert,
|
| - tls_verifyAuthorities=[cert]).addCallback(secured)
|
| -
|
| -
|
| -class SlightlySmartTLS(SimpleSymmetricCommandProtocol):
|
| - """
|
| - Specific implementation of server side protocol with different
|
| - management of TLS.
|
| - """
|
| - def getTLSVars(self):
|
| - """
|
| - @return: the global C{tempcert} certificate as local certificate.
|
| - """
|
| - return dict(tls_localCertificate=tempcert)
|
| - amp.StartTLS.responder(getTLSVars)
|
| -
|
| -
|
| -class PlainVanillaLiveFire(LiveFireBase, unittest.TestCase):
|
| -
|
| - clientProto = SimpleSymmetricCommandProtocol
|
| - serverProto = SimpleSymmetricCommandProtocol
|
| -
|
| - def test_liveFireDefaultTLS(self):
|
| - """
|
| - Verify that out of the box, we can start TLS to at least encrypt the
|
| - connection, even if we don't have any certificates to use.
|
| - """
|
| - def secured(result):
|
| - return self.cli.callRemote(SecuredPing)
|
| - return self.cli.callRemote(amp.StartTLS).addCallback(secured)
|
| -
|
| -
|
| -class WithServerTLSVerification(LiveFireBase, unittest.TestCase):
|
| - clientProto = SimpleSymmetricCommandProtocol
|
| - serverProto = SlightlySmartTLS
|
| -
|
| - def test_anonymousVerifyingClient(self):
|
| - """
|
| - Verify that anonymous clients can verify server certificates.
|
| - """
|
| - def secured(result):
|
| - return self.cli.callRemote(SecuredPing)
|
| - return self.cli.callRemote(amp.StartTLS,
|
| - tls_verifyAuthorities=[tempcert]
|
| - ).addCallback(secured)
|
| -
|
| -
|
| -
|
| -class ProtocolIncludingArgument(amp.Argument):
|
| - """
|
| - An L{amp.Argument} which encodes its parser and serializer
|
| - arguments *including the protocol* into its parsed and serialized
|
| - forms.
|
| - """
|
| -
|
| - def fromStringProto(self, string, protocol):
|
| - """
|
| - Don't decode anything; just return all possible information.
|
| -
|
| - @return: A two-tuple of the input string and the protocol.
|
| - """
|
| - return (string, protocol)
|
| -
|
| - def toStringProto(self, obj, protocol):
|
| - """
|
| - Encode identifying information about L{object} and protocol
|
| - into a string for later verification.
|
| -
|
| - @type obj: L{object}
|
| - @type protocol: L{amp.AMP}
|
| - """
|
| - return "%s:%s" % (id(obj), id(protocol))
|
| -
|
| -
|
| -
|
| -class ProtocolIncludingCommand(amp.Command):
|
| - """
|
| - A command that has argument and response schemas which use
|
| - L{ProtocolIncludingArgument}.
|
| - """
|
| - arguments = [('weird', ProtocolIncludingArgument())]
|
| - response = [('weird', ProtocolIncludingArgument())]
|
| -
|
| -
|
| -
|
| -class MagicSchemaCommand(amp.Command):
|
| - """
|
| - A command which overrides L{parseResponse}, L{parseArguments}, and
|
| - L{makeResponse}.
|
| - """
|
| - def parseResponse(self, strings, protocol):
|
| - """
|
| - Don't do any parsing, just jam the input strings and protocol
|
| - onto the C{protocol.parseResponseArguments} attribute as a
|
| - two-tuple. Return the original strings.
|
| - """
|
| - protocol.parseResponseArguments = (strings, protocol)
|
| - return strings
|
| - parseResponse = classmethod(parseResponse)
|
| -
|
| -
|
| - def parseArguments(cls, strings, protocol):
|
| - """
|
| - Don't do any parsing, just jam the input strings and protocol
|
| - onto the C{protocol.parseArgumentsArguments} attribute as a
|
| - two-tuple. Return the original strings.
|
| - """
|
| - protocol.parseArgumentsArguments = (strings, protocol)
|
| - return strings
|
| - parseArguments = classmethod(parseArguments)
|
| -
|
| -
|
| - def makeArguments(cls, objects, protocol):
|
| - """
|
| - Don't do any serializing, just jam the input strings and protocol
|
| - onto the C{protocol.makeArgumentsArguments} attribute as a
|
| - two-tuple. Return the original strings.
|
| - """
|
| - protocol.makeArgumentsArguments = (objects, protocol)
|
| - return objects
|
| - makeArguments = classmethod(makeArguments)
|
| -
|
| -
|
| -
|
| -class NoNetworkProtocol(amp.AMP):
|
| - """
|
| - An L{amp.AMP} subclass which overrides private methods to avoid
|
| - testing the network. It also provides a responder for
|
| - L{MagicSchemaCommand} that does nothing, so that tests can test
|
| - aspects of the interaction of L{amp.Command}s and L{amp.AMP}.
|
| -
|
| - @ivar parseArgumentsArguments: Arguments that have been passed to any
|
| - L{MagicSchemaCommand}, if L{MagicSchemaCommand} has been handled by
|
| - this protocol.
|
| -
|
| - @ivar parseResponseArguments: Responses that have been returned from a
|
| - L{MagicSchemaCommand}, if L{MagicSchemaCommand} has been handled by
|
| - this protocol.
|
| -
|
| - @ivar makeArgumentsArguments: Arguments that have been serialized by any
|
| - L{MagicSchemaCommand}, if L{MagicSchemaCommand} has been handled by
|
| - this protocol.
|
| - """
|
| - def _sendBoxCommand(self, commandName, strings, requiresAnswer):
|
| - """
|
| - Return a Deferred which fires with the original strings.
|
| - """
|
| - return defer.succeed(strings)
|
| -
|
| - MagicSchemaCommand.responder(lambda s, weird: {})
|
| -
|
| -
|
| -
|
| -class MyBox(dict):
|
| - """
|
| - A unique dict subclass.
|
| - """
|
| -
|
| -
|
| -
|
| -class ProtocolIncludingCommandWithDifferentCommandType(
|
| - ProtocolIncludingCommand):
|
| - """
|
| - A L{ProtocolIncludingCommand} subclass whose commandType is L{MyBox}
|
| - """
|
| - commandType = MyBox
|
| -
|
| -
|
| -
|
| -class CommandTestCase(unittest.TestCase):
|
| - """
|
| - Tests for L{amp.Command}.
|
| - """
|
| -
|
| - def test_parseResponse(self):
|
| - """
|
| - There should be a class method of Command which accepts a
|
| - mapping of argument names to serialized forms and returns a
|
| - similar mapping whose values have been parsed via the
|
| - Command's response schema.
|
| - """
|
| - protocol = object()
|
| - result = 'whatever'
|
| - strings = {'weird': result}
|
| - self.assertEqual(
|
| - ProtocolIncludingCommand.parseResponse(strings, protocol),
|
| - {'weird': (result, protocol)})
|
| -
|
| -
|
| - def test_callRemoteCallsParseResponse(self):
|
| - """
|
| - Making a remote call on a L{amp.Command} subclass which
|
| - overrides the C{parseResponse} method should call that
|
| - C{parseResponse} method to get the response.
|
| - """
|
| - client = NoNetworkProtocol()
|
| - thingy = "weeoo"
|
| - response = client.callRemote(MagicSchemaCommand, weird=thingy)
|
| - def gotResponse(ign):
|
| - self.assertEquals(client.parseResponseArguments,
|
| - ({"weird": thingy}, client))
|
| - response.addCallback(gotResponse)
|
| - return response
|
| -
|
| -
|
| - def test_parseArguments(self):
|
| - """
|
| - There should be a class method of L{amp.Command} which accepts
|
| - a mapping of argument names to serialized forms and returns a
|
| - similar mapping whose values have been parsed via the
|
| - command's argument schema.
|
| - """
|
| - protocol = object()
|
| - result = 'whatever'
|
| - strings = {'weird': result}
|
| - self.assertEqual(
|
| - ProtocolIncludingCommand.parseArguments(strings, protocol),
|
| - {'weird': (result, protocol)})
|
| -
|
| -
|
| - def test_responderCallsParseArguments(self):
|
| - """
|
| - Making a remote call on a L{amp.Command} subclass which
|
| - overrides the C{parseArguments} method should call that
|
| - C{parseArguments} method to get the arguments.
|
| - """
|
| - protocol = NoNetworkProtocol()
|
| - responder = protocol.locateResponder(MagicSchemaCommand.commandName)
|
| - argument = object()
|
| - response = responder(dict(weird=argument))
|
| - response.addCallback(
|
| - lambda ign: self.assertEqual(protocol.parseArgumentsArguments,
|
| - ({"weird": argument}, protocol)))
|
| - return response
|
| -
|
| -
|
| - def test_makeArguments(self):
|
| - """
|
| - There should be a class method of L{amp.Command} which accepts
|
| - a mapping of argument names to objects and returns a similar
|
| - mapping whose values have been serialized via the command's
|
| - argument schema.
|
| - """
|
| - protocol = object()
|
| - argument = object()
|
| - objects = {'weird': argument}
|
| - self.assertEqual(
|
| - ProtocolIncludingCommand.makeArguments(objects, protocol),
|
| - {'weird': "%d:%d" % (id(argument), id(protocol))})
|
| -
|
| -
|
| - def test_makeArgumentsUsesCommandType(self):
|
| - """
|
| - L{amp.Command.makeArguments}'s return type should be the type
|
| - of the result of L{amp.Command.commandType}.
|
| - """
|
| - protocol = object()
|
| - objects = {"weird": "whatever"}
|
| -
|
| - result = ProtocolIncludingCommandWithDifferentCommandType.makeArguments(
|
| - objects, protocol)
|
| - self.assertIdentical(type(result), MyBox)
|
| -
|
| -
|
| - def test_callRemoteCallsMakeArguments(self):
|
| - """
|
| - Making a remote call on a L{amp.Command} subclass which
|
| - overrides the C{makeArguments} method should call that
|
| - C{makeArguments} method to get the response.
|
| - """
|
| - client = NoNetworkProtocol()
|
| - argument = object()
|
| - response = client.callRemote(MagicSchemaCommand, weird=argument)
|
| - def gotResponse(ign):
|
| - self.assertEqual(client.makeArgumentsArguments,
|
| - ({"weird": argument}, client))
|
| - response.addCallback(gotResponse)
|
| - return response
|
| -
|
| -if not interfaces.IReactorSSL.providedBy(reactor):
|
| - skipMsg = 'This test case requires SSL support in the reactor'
|
| - TLSTest.skip = skipMsg
|
| - LiveFireTLSTestCase.skip = skipMsg
|
| - PlainVanillaLiveFire.skip = skipMsg
|
| - WithServerTLSVerification.skip = skipMsg
|
| -
|
|
|