OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 """ | |
5 Tests for L{twisted.words.protocols.jabber.client} | |
6 """ | |
7 | |
8 import sha | |
9 from twisted.trial import unittest | |
10 from twisted.words.protocols.jabber import client, error, jid, xmlstream | |
11 from twisted.words.protocols.jabber.sasl import SASLInitiatingInitializer | |
12 | |
13 class CheckVersionInitializerTest(unittest.TestCase): | |
14 def setUp(self): | |
15 a = xmlstream.Authenticator() | |
16 xs = xmlstream.XmlStream(a) | |
17 self.init = client.CheckVersionInitializer(xs) | |
18 | |
19 def testSupported(self): | |
20 """ | |
21 Test supported version number 1.0 | |
22 """ | |
23 self.init.xmlstream.version = (1, 0) | |
24 self.init.initialize() | |
25 | |
26 def testNotSupported(self): | |
27 """ | |
28 Test unsupported version number 0.0, and check exception. | |
29 """ | |
30 self.init.xmlstream.version = (0, 0) | |
31 exc = self.assertRaises(error.StreamError, self.init.initialize) | |
32 self.assertEquals('unsupported-version', exc.condition) | |
33 | |
34 | |
35 class InitiatingInitializerHarness(object): | |
36 def setUp(self): | |
37 self.output = [] | |
38 self.authenticator = xmlstream.ConnectAuthenticator('example.org') | |
39 self.xmlstream = xmlstream.XmlStream(self.authenticator) | |
40 self.xmlstream.send = self.output.append | |
41 self.xmlstream.connectionMade() | |
42 self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' " | |
43 "xmlns:stream='http://etherx.jabber.org/streams' " | |
44 "from='example.com' id='12345' version='1.0'>") | |
45 | |
46 | |
47 class IQAuthInitializerTest(InitiatingInitializerHarness, unittest.TestCase): | |
48 def setUp(self): | |
49 super(IQAuthInitializerTest, self).setUp() | |
50 self.init = client.IQAuthInitializer(self.xmlstream) | |
51 self.authenticator.jid = jid.JID('user@example.com/resource') | |
52 self.authenticator.password = 'secret' | |
53 | |
54 def testBasic(self): | |
55 """ | |
56 Test basic operations with plain text authentication. | |
57 | |
58 Set up a stream, and act as if authentication succeeds. | |
59 """ | |
60 d = self.init.initialize() | |
61 | |
62 # The initializer should have sent query to find out auth methods. | |
63 | |
64 # Examine query. | |
65 iq = self.output[-1] | |
66 self.assertEquals('iq', iq.name) | |
67 self.assertEquals('get', iq['type']) | |
68 self.assertEquals(('jabber:iq:auth', 'query'), | |
69 (iq.children[0].uri, iq.children[0].name)) | |
70 | |
71 # Send server response | |
72 iq['type'] = 'result' | |
73 iq.query.addElement('username') | |
74 iq.query.addElement('password') | |
75 iq.query.addElement('resource') | |
76 self.xmlstream.dataReceived(iq.toXml()) | |
77 | |
78 # Upon receiving the response, the initializer can start authentication | |
79 | |
80 iq = self.output[-1] | |
81 self.assertEquals('iq', iq.name) | |
82 self.assertEquals('set', iq['type']) | |
83 self.assertEquals(('jabber:iq:auth', 'query'), | |
84 (iq.children[0].uri, iq.children[0].name)) | |
85 self.assertEquals('user', unicode(iq.query.username)) | |
86 self.assertEquals('secret', unicode(iq.query.password)) | |
87 self.assertEquals('resource', unicode(iq.query.resource)) | |
88 | |
89 # Send server response | |
90 self.xmlstream.dataReceived("<iq type='result' id='%s'/>" % iq['id']) | |
91 return d | |
92 | |
93 def testDigest(self): | |
94 """ | |
95 Test digest authentication. | |
96 | |
97 Set up a stream, and act as if authentication succeeds. | |
98 """ | |
99 d = self.init.initialize() | |
100 | |
101 # The initializer should have sent query to find out auth methods. | |
102 | |
103 iq = self.output[-1] | |
104 | |
105 # Send server response | |
106 iq['type'] = 'result' | |
107 iq.query.addElement('username') | |
108 iq.query.addElement('digest') | |
109 iq.query.addElement('resource') | |
110 self.xmlstream.dataReceived(iq.toXml()) | |
111 | |
112 # Upon receiving the response, the initializer can start authentication | |
113 | |
114 iq = self.output[-1] | |
115 self.assertEquals('iq', iq.name) | |
116 self.assertEquals('set', iq['type']) | |
117 self.assertEquals(('jabber:iq:auth', 'query'), | |
118 (iq.children[0].uri, iq.children[0].name)) | |
119 self.assertEquals('user', unicode(iq.query.username)) | |
120 self.assertEquals(sha.new('12345secret').hexdigest(), | |
121 unicode(iq.query.digest)) | |
122 self.assertEquals('resource', unicode(iq.query.resource)) | |
123 | |
124 # Send server response | |
125 self.xmlstream.dataReceived("<iq type='result' id='%s'/>" % iq['id']) | |
126 return d | |
127 | |
128 def testFailRequestFields(self): | |
129 """ | |
130 Test failure of request for fields. | |
131 """ | |
132 d = self.init.initialize() | |
133 iq = self.output[-1] | |
134 response = error.StanzaError('not-authorized').toResponse(iq) | |
135 self.xmlstream.dataReceived(response.toXml()) | |
136 self.assertFailure(d, error.StanzaError) | |
137 return d | |
138 | |
139 def testFailAuth(self): | |
140 """ | |
141 Test failure of request for fields. | |
142 """ | |
143 d = self.init.initialize() | |
144 iq = self.output[-1] | |
145 iq['type'] = 'result' | |
146 iq.query.addElement('username') | |
147 iq.query.addElement('password') | |
148 iq.query.addElement('resource') | |
149 self.xmlstream.dataReceived(iq.toXml()) | |
150 iq = self.output[-1] | |
151 response = error.StanzaError('not-authorized').toResponse(iq) | |
152 self.xmlstream.dataReceived(response.toXml()) | |
153 self.assertFailure(d, error.StanzaError) | |
154 return d | |
155 | |
156 | |
157 class BindInitializerTest(InitiatingInitializerHarness, unittest.TestCase): | |
158 def setUp(self): | |
159 super(BindInitializerTest, self).setUp() | |
160 self.init = client.BindInitializer(self.xmlstream) | |
161 self.authenticator.jid = jid.JID('user@example.com/resource') | |
162 | |
163 def testBasic(self): | |
164 """ | |
165 Test basic operations. | |
166 | |
167 Set up a stream, and act as if resource binding succeeds. | |
168 """ | |
169 def cb(result): | |
170 self.assertEquals(jid.JID('user@example.com/other resource'), | |
171 self.authenticator.jid) | |
172 | |
173 d = self.init.start().addCallback(cb) | |
174 iq = self.output[-1] | |
175 self.assertEquals('iq', iq.name) | |
176 self.assertEquals('set', iq['type']) | |
177 self.assertEquals(('urn:ietf:params:xml:ns:xmpp-bind', 'bind'), | |
178 (iq.children[0].uri, iq.children[0].name)) | |
179 iq['type'] = 'result' | |
180 iq.bind.children = [] | |
181 iq.bind.addElement('jid', content='user@example.com/other resource') | |
182 self.xmlstream.dataReceived(iq.toXml()) | |
183 return d | |
184 | |
185 def testFailure(self): | |
186 """ | |
187 Test basic operations. | |
188 | |
189 Set up a stream, and act as if resource binding fails. | |
190 """ | |
191 def cb(result): | |
192 self.assertEquals(jid.JID('user@example.com/resource'), | |
193 self.authenticator.jid) | |
194 | |
195 d = self.init.start() | |
196 id = self.output[-1]['id'] | |
197 self.xmlstream.dataReceived("<iq type='error' id='%s'/>" % id) | |
198 self.assertFailure(d, error.StanzaError) | |
199 return d | |
200 | |
201 | |
202 class SessionInitializerTest(InitiatingInitializerHarness, unittest.TestCase): | |
203 | |
204 def setUp(self): | |
205 super(SessionInitializerTest, self).setUp() | |
206 self.init = client.SessionInitializer(self.xmlstream) | |
207 | |
208 def testSuccess(self): | |
209 """ | |
210 Test basic operations. | |
211 | |
212 Set up a stream, and act as if resource binding succeeds. | |
213 """ | |
214 d = self.init.start() | |
215 iq = self.output[-1] | |
216 self.assertEquals('iq', iq.name) | |
217 self.assertEquals('set', iq['type']) | |
218 self.assertEquals(('urn:ietf:params:xml:ns:xmpp-session', 'session'), | |
219 (iq.children[0].uri, iq.children[0].name)) | |
220 self.xmlstream.dataReceived("<iq type='result' id='%s'/>" % iq['id']) | |
221 return d | |
222 | |
223 def testFailure(self): | |
224 """ | |
225 Test basic operations. | |
226 | |
227 Set up a stream, and act as if session establishment succeeds. | |
228 """ | |
229 d = self.init.start() | |
230 id = self.output[-1]['id'] | |
231 self.xmlstream.dataReceived("<iq type='error' id='%s'/>" % id) | |
232 self.assertFailure(d, error.StanzaError) | |
233 return d | |
234 | |
235 | |
236 class XMPPAuthenticatorTest(unittest.TestCase): | |
237 """ | |
238 Test for both XMPPAuthenticator and XMPPClientFactory. | |
239 """ | |
240 def testBasic(self): | |
241 """ | |
242 Test basic operations. | |
243 | |
244 Setup an XMPPClientFactory, which sets up an XMPPAuthenticator, and let | |
245 it produce a protocol instance. Then inspect the instance variables of | |
246 the authenticator and XML stream objects. | |
247 """ | |
248 self.client_jid = jid.JID('user@example.com/resource') | |
249 | |
250 # Get an XmlStream instance. Note that it gets initialized with the | |
251 # XMPPAuthenticator (that has its associateWithXmlStream called) that | |
252 # is in turn initialized with the arguments to the factory. | |
253 xs = client.XMPPClientFactory(self.client_jid, | |
254 'secret').buildProtocol(None) | |
255 | |
256 # test authenticator's instance variables | |
257 self.assertEqual('example.com', xs.authenticator.otherHost) | |
258 self.assertEqual(self.client_jid, xs.authenticator.jid) | |
259 self.assertEqual('secret', xs.authenticator.password) | |
260 | |
261 # test list of initializers | |
262 version, tls, sasl, bind, session = xs.initializers | |
263 | |
264 self.assert_(isinstance(tls, xmlstream.TLSInitiatingInitializer)) | |
265 self.assert_(isinstance(sasl, SASLInitiatingInitializer)) | |
266 self.assert_(isinstance(bind, client.BindInitializer)) | |
267 self.assert_(isinstance(session, client.SessionInitializer)) | |
268 | |
269 self.assertFalse(tls.required) | |
270 self.assertTrue(sasl.required) | |
271 self.assertFalse(bind.required) | |
272 self.assertFalse(session.required) | |
OLD | NEW |