OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 """ | |
5 Tests for L{twisted.words.xish.xmlstream}. | |
6 """ | |
7 | |
8 from twisted.internet import defer, protocol | |
9 from twisted.trial import unittest | |
10 from twisted.words.xish import utility, xmlstream | |
11 | |
12 class XmlStreamTest(unittest.TestCase): | |
13 def setUp(self): | |
14 self.errorOccurred = False | |
15 self.streamStarted = False | |
16 self.streamEnded = False | |
17 self.outlist = [] | |
18 self.xmlstream = xmlstream.XmlStream() | |
19 self.xmlstream.transport = self | |
20 self.xmlstream.transport.write = self.outlist.append | |
21 | |
22 # Auxilary methods | |
23 def loseConnection(self): | |
24 self.xmlstream.connectionLost("no reason") | |
25 | |
26 def streamStartEvent(self, rootelem): | |
27 self.streamStarted = True | |
28 | |
29 def streamErrorEvent(self, errelem): | |
30 self.errorOccurred = True | |
31 | |
32 def streamEndEvent(self, _): | |
33 self.streamEnded = True | |
34 | |
35 def testBasicOp(self): | |
36 xs = self.xmlstream | |
37 xs.addObserver(xmlstream.STREAM_START_EVENT, | |
38 self.streamStartEvent) | |
39 xs.addObserver(xmlstream.STREAM_ERROR_EVENT, | |
40 self.streamErrorEvent) | |
41 xs.addObserver(xmlstream.STREAM_END_EVENT, | |
42 self.streamEndEvent) | |
43 | |
44 # Go... | |
45 xs.connectionMade() | |
46 xs.send("<root>") | |
47 self.assertEquals(self.outlist[0], "<root>") | |
48 | |
49 xs.dataReceived("<root>") | |
50 self.assertEquals(self.streamStarted, True) | |
51 | |
52 self.assertEquals(self.errorOccurred, False) | |
53 self.assertEquals(self.streamEnded, False) | |
54 xs.dataReceived("<child><unclosed></child>") | |
55 self.assertEquals(self.errorOccurred, True) | |
56 self.assertEquals(self.streamEnded, True) | |
57 | |
58 | |
59 class DummyProtocol(protocol.Protocol, utility.EventDispatcher): | |
60 """ | |
61 I am a protocol with an event dispatcher without further processing. | |
62 | |
63 This protocol is only used for testing XmlStreamFactoryMixin to make | |
64 sure the bootstrap observers are added to the protocol instance. | |
65 """ | |
66 | |
67 def __init__(self, *args, **kwargs): | |
68 self.args = args | |
69 self.kwargs = kwargs | |
70 self.observers = [] | |
71 | |
72 utility.EventDispatcher.__init__(self) | |
73 | |
74 | |
75 class XmlStreamFactoryMixinTest(unittest.TestCase): | |
76 | |
77 def test_buildProtocol(self): | |
78 """ | |
79 Test building of protocol. | |
80 | |
81 Arguments passed to Factory should be passed to protocol on | |
82 instantiation. Bootstrap observers should be setup. | |
83 """ | |
84 d = defer.Deferred() | |
85 | |
86 f = xmlstream.XmlStreamFactoryMixin(None, test=None) | |
87 f.protocol = DummyProtocol | |
88 f.addBootstrap('//event/myevent', d.callback) | |
89 xs = f.buildProtocol(None) | |
90 | |
91 self.assertEquals(f, xs.factory) | |
92 self.assertEquals((None,), xs.args) | |
93 self.assertEquals({'test': None}, xs.kwargs) | |
94 xs.dispatch(None, '//event/myevent') | |
95 return d | |
96 | |
97 def test_addAndRemoveBootstrap(self): | |
98 """ | |
99 Test addition and removal of a bootstrap event handler. | |
100 """ | |
101 def cb(self): | |
102 pass | |
103 | |
104 f = xmlstream.XmlStreamFactoryMixin(None, test=None) | |
105 | |
106 f.addBootstrap('//event/myevent', cb) | |
107 self.assertIn(('//event/myevent', cb), f.bootstraps) | |
108 | |
109 f.removeBootstrap('//event/myevent', cb) | |
110 self.assertNotIn(('//event/myevent', cb), f.bootstraps) | |
OLD | NEW |