| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 """ | |
| 5 Tests for L{twisted.words.xish.xmlstream}. | |
| 6 """ | |
| 7 | |
| 8 from twisted.internet import defer, protocol | |
| 9 from twisted.trial import unittest | |
| 10 from twisted.words.xish import utility, xmlstream | |
| 11 | |
| 12 class XmlStreamTest(unittest.TestCase): | |
| 13 def setUp(self): | |
| 14 self.errorOccurred = False | |
| 15 self.streamStarted = False | |
| 16 self.streamEnded = False | |
| 17 self.outlist = [] | |
| 18 self.xmlstream = xmlstream.XmlStream() | |
| 19 self.xmlstream.transport = self | |
| 20 self.xmlstream.transport.write = self.outlist.append | |
| 21 | |
| 22 # Auxilary methods | |
| 23 def loseConnection(self): | |
| 24 self.xmlstream.connectionLost("no reason") | |
| 25 | |
| 26 def streamStartEvent(self, rootelem): | |
| 27 self.streamStarted = True | |
| 28 | |
| 29 def streamErrorEvent(self, errelem): | |
| 30 self.errorOccurred = True | |
| 31 | |
| 32 def streamEndEvent(self, _): | |
| 33 self.streamEnded = True | |
| 34 | |
| 35 def testBasicOp(self): | |
| 36 xs = self.xmlstream | |
| 37 xs.addObserver(xmlstream.STREAM_START_EVENT, | |
| 38 self.streamStartEvent) | |
| 39 xs.addObserver(xmlstream.STREAM_ERROR_EVENT, | |
| 40 self.streamErrorEvent) | |
| 41 xs.addObserver(xmlstream.STREAM_END_EVENT, | |
| 42 self.streamEndEvent) | |
| 43 | |
| 44 # Go... | |
| 45 xs.connectionMade() | |
| 46 xs.send("<root>") | |
| 47 self.assertEquals(self.outlist[0], "<root>") | |
| 48 | |
| 49 xs.dataReceived("<root>") | |
| 50 self.assertEquals(self.streamStarted, True) | |
| 51 | |
| 52 self.assertEquals(self.errorOccurred, False) | |
| 53 self.assertEquals(self.streamEnded, False) | |
| 54 xs.dataReceived("<child><unclosed></child>") | |
| 55 self.assertEquals(self.errorOccurred, True) | |
| 56 self.assertEquals(self.streamEnded, True) | |
| 57 | |
| 58 | |
| 59 class DummyProtocol(protocol.Protocol, utility.EventDispatcher): | |
| 60 """ | |
| 61 I am a protocol with an event dispatcher without further processing. | |
| 62 | |
| 63 This protocol is only used for testing XmlStreamFactoryMixin to make | |
| 64 sure the bootstrap observers are added to the protocol instance. | |
| 65 """ | |
| 66 | |
| 67 def __init__(self, *args, **kwargs): | |
| 68 self.args = args | |
| 69 self.kwargs = kwargs | |
| 70 self.observers = [] | |
| 71 | |
| 72 utility.EventDispatcher.__init__(self) | |
| 73 | |
| 74 | |
| 75 class XmlStreamFactoryMixinTest(unittest.TestCase): | |
| 76 | |
| 77 def test_buildProtocol(self): | |
| 78 """ | |
| 79 Test building of protocol. | |
| 80 | |
| 81 Arguments passed to Factory should be passed to protocol on | |
| 82 instantiation. Bootstrap observers should be setup. | |
| 83 """ | |
| 84 d = defer.Deferred() | |
| 85 | |
| 86 f = xmlstream.XmlStreamFactoryMixin(None, test=None) | |
| 87 f.protocol = DummyProtocol | |
| 88 f.addBootstrap('//event/myevent', d.callback) | |
| 89 xs = f.buildProtocol(None) | |
| 90 | |
| 91 self.assertEquals(f, xs.factory) | |
| 92 self.assertEquals((None,), xs.args) | |
| 93 self.assertEquals({'test': None}, xs.kwargs) | |
| 94 xs.dispatch(None, '//event/myevent') | |
| 95 return d | |
| 96 | |
| 97 def test_addAndRemoveBootstrap(self): | |
| 98 """ | |
| 99 Test addition and removal of a bootstrap event handler. | |
| 100 """ | |
| 101 def cb(self): | |
| 102 pass | |
| 103 | |
| 104 f = xmlstream.XmlStreamFactoryMixin(None, test=None) | |
| 105 | |
| 106 f.addBootstrap('//event/myevent', cb) | |
| 107 self.assertIn(('//event/myevent', cb), f.bootstraps) | |
| 108 | |
| 109 f.removeBootstrap('//event/myevent', cb) | |
| 110 self.assertNotIn(('//event/myevent', cb), f.bootstraps) | |
| OLD | NEW |