Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(54)

Side by Side Diff: third_party/twisted_8_1/twisted/words/test/test_jabberclient.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for L{twisted.words.protocols.jabber.client}
6 """
7
8 import sha
9 from twisted.trial import unittest
10 from twisted.words.protocols.jabber import client, error, jid, xmlstream
11 from twisted.words.protocols.jabber.sasl import SASLInitiatingInitializer
12
13 class CheckVersionInitializerTest(unittest.TestCase):
14 def setUp(self):
15 a = xmlstream.Authenticator()
16 xs = xmlstream.XmlStream(a)
17 self.init = client.CheckVersionInitializer(xs)
18
19 def testSupported(self):
20 """
21 Test supported version number 1.0
22 """
23 self.init.xmlstream.version = (1, 0)
24 self.init.initialize()
25
26 def testNotSupported(self):
27 """
28 Test unsupported version number 0.0, and check exception.
29 """
30 self.init.xmlstream.version = (0, 0)
31 exc = self.assertRaises(error.StreamError, self.init.initialize)
32 self.assertEquals('unsupported-version', exc.condition)
33
34
35 class InitiatingInitializerHarness(object):
36 def setUp(self):
37 self.output = []
38 self.authenticator = xmlstream.ConnectAuthenticator('example.org')
39 self.xmlstream = xmlstream.XmlStream(self.authenticator)
40 self.xmlstream.send = self.output.append
41 self.xmlstream.connectionMade()
42 self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' "
43 "xmlns:stream='http://etherx.jabber.org/streams' "
44 "from='example.com' id='12345' version='1.0'>")
45
46
47 class IQAuthInitializerTest(InitiatingInitializerHarness, unittest.TestCase):
48 def setUp(self):
49 super(IQAuthInitializerTest, self).setUp()
50 self.init = client.IQAuthInitializer(self.xmlstream)
51 self.authenticator.jid = jid.JID('user@example.com/resource')
52 self.authenticator.password = 'secret'
53
54 def testBasic(self):
55 """
56 Test basic operations with plain text authentication.
57
58 Set up a stream, and act as if authentication succeeds.
59 """
60 d = self.init.initialize()
61
62 # The initializer should have sent query to find out auth methods.
63
64 # Examine query.
65 iq = self.output[-1]
66 self.assertEquals('iq', iq.name)
67 self.assertEquals('get', iq['type'])
68 self.assertEquals(('jabber:iq:auth', 'query'),
69 (iq.children[0].uri, iq.children[0].name))
70
71 # Send server response
72 iq['type'] = 'result'
73 iq.query.addElement('username')
74 iq.query.addElement('password')
75 iq.query.addElement('resource')
76 self.xmlstream.dataReceived(iq.toXml())
77
78 # Upon receiving the response, the initializer can start authentication
79
80 iq = self.output[-1]
81 self.assertEquals('iq', iq.name)
82 self.assertEquals('set', iq['type'])
83 self.assertEquals(('jabber:iq:auth', 'query'),
84 (iq.children[0].uri, iq.children[0].name))
85 self.assertEquals('user', unicode(iq.query.username))
86 self.assertEquals('secret', unicode(iq.query.password))
87 self.assertEquals('resource', unicode(iq.query.resource))
88
89 # Send server response
90 self.xmlstream.dataReceived("<iq type='result' id='%s'/>" % iq['id'])
91 return d
92
93 def testDigest(self):
94 """
95 Test digest authentication.
96
97 Set up a stream, and act as if authentication succeeds.
98 """
99 d = self.init.initialize()
100
101 # The initializer should have sent query to find out auth methods.
102
103 iq = self.output[-1]
104
105 # Send server response
106 iq['type'] = 'result'
107 iq.query.addElement('username')
108 iq.query.addElement('digest')
109 iq.query.addElement('resource')
110 self.xmlstream.dataReceived(iq.toXml())
111
112 # Upon receiving the response, the initializer can start authentication
113
114 iq = self.output[-1]
115 self.assertEquals('iq', iq.name)
116 self.assertEquals('set', iq['type'])
117 self.assertEquals(('jabber:iq:auth', 'query'),
118 (iq.children[0].uri, iq.children[0].name))
119 self.assertEquals('user', unicode(iq.query.username))
120 self.assertEquals(sha.new('12345secret').hexdigest(),
121 unicode(iq.query.digest))
122 self.assertEquals('resource', unicode(iq.query.resource))
123
124 # Send server response
125 self.xmlstream.dataReceived("<iq type='result' id='%s'/>" % iq['id'])
126 return d
127
128 def testFailRequestFields(self):
129 """
130 Test failure of request for fields.
131 """
132 d = self.init.initialize()
133 iq = self.output[-1]
134 response = error.StanzaError('not-authorized').toResponse(iq)
135 self.xmlstream.dataReceived(response.toXml())
136 self.assertFailure(d, error.StanzaError)
137 return d
138
139 def testFailAuth(self):
140 """
141 Test failure of request for fields.
142 """
143 d = self.init.initialize()
144 iq = self.output[-1]
145 iq['type'] = 'result'
146 iq.query.addElement('username')
147 iq.query.addElement('password')
148 iq.query.addElement('resource')
149 self.xmlstream.dataReceived(iq.toXml())
150 iq = self.output[-1]
151 response = error.StanzaError('not-authorized').toResponse(iq)
152 self.xmlstream.dataReceived(response.toXml())
153 self.assertFailure(d, error.StanzaError)
154 return d
155
156
157 class BindInitializerTest(InitiatingInitializerHarness, unittest.TestCase):
158 def setUp(self):
159 super(BindInitializerTest, self).setUp()
160 self.init = client.BindInitializer(self.xmlstream)
161 self.authenticator.jid = jid.JID('user@example.com/resource')
162
163 def testBasic(self):
164 """
165 Test basic operations.
166
167 Set up a stream, and act as if resource binding succeeds.
168 """
169 def cb(result):
170 self.assertEquals(jid.JID('user@example.com/other resource'),
171 self.authenticator.jid)
172
173 d = self.init.start().addCallback(cb)
174 iq = self.output[-1]
175 self.assertEquals('iq', iq.name)
176 self.assertEquals('set', iq['type'])
177 self.assertEquals(('urn:ietf:params:xml:ns:xmpp-bind', 'bind'),
178 (iq.children[0].uri, iq.children[0].name))
179 iq['type'] = 'result'
180 iq.bind.children = []
181 iq.bind.addElement('jid', content='user@example.com/other resource')
182 self.xmlstream.dataReceived(iq.toXml())
183 return d
184
185 def testFailure(self):
186 """
187 Test basic operations.
188
189 Set up a stream, and act as if resource binding fails.
190 """
191 def cb(result):
192 self.assertEquals(jid.JID('user@example.com/resource'),
193 self.authenticator.jid)
194
195 d = self.init.start()
196 id = self.output[-1]['id']
197 self.xmlstream.dataReceived("<iq type='error' id='%s'/>" % id)
198 self.assertFailure(d, error.StanzaError)
199 return d
200
201
202 class SessionInitializerTest(InitiatingInitializerHarness, unittest.TestCase):
203
204 def setUp(self):
205 super(SessionInitializerTest, self).setUp()
206 self.init = client.SessionInitializer(self.xmlstream)
207
208 def testSuccess(self):
209 """
210 Test basic operations.
211
212 Set up a stream, and act as if resource binding succeeds.
213 """
214 d = self.init.start()
215 iq = self.output[-1]
216 self.assertEquals('iq', iq.name)
217 self.assertEquals('set', iq['type'])
218 self.assertEquals(('urn:ietf:params:xml:ns:xmpp-session', 'session'),
219 (iq.children[0].uri, iq.children[0].name))
220 self.xmlstream.dataReceived("<iq type='result' id='%s'/>" % iq['id'])
221 return d
222
223 def testFailure(self):
224 """
225 Test basic operations.
226
227 Set up a stream, and act as if session establishment succeeds.
228 """
229 d = self.init.start()
230 id = self.output[-1]['id']
231 self.xmlstream.dataReceived("<iq type='error' id='%s'/>" % id)
232 self.assertFailure(d, error.StanzaError)
233 return d
234
235
236 class XMPPAuthenticatorTest(unittest.TestCase):
237 """
238 Test for both XMPPAuthenticator and XMPPClientFactory.
239 """
240 def testBasic(self):
241 """
242 Test basic operations.
243
244 Setup an XMPPClientFactory, which sets up an XMPPAuthenticator, and let
245 it produce a protocol instance. Then inspect the instance variables of
246 the authenticator and XML stream objects.
247 """
248 self.client_jid = jid.JID('user@example.com/resource')
249
250 # Get an XmlStream instance. Note that it gets initialized with the
251 # XMPPAuthenticator (that has its associateWithXmlStream called) that
252 # is in turn initialized with the arguments to the factory.
253 xs = client.XMPPClientFactory(self.client_jid,
254 'secret').buildProtocol(None)
255
256 # test authenticator's instance variables
257 self.assertEqual('example.com', xs.authenticator.otherHost)
258 self.assertEqual(self.client_jid, xs.authenticator.jid)
259 self.assertEqual('secret', xs.authenticator.password)
260
261 # test list of initializers
262 version, tls, sasl, bind, session = xs.initializers
263
264 self.assert_(isinstance(tls, xmlstream.TLSInitiatingInitializer))
265 self.assert_(isinstance(sasl, SASLInitiatingInitializer))
266 self.assert_(isinstance(bind, client.BindInitializer))
267 self.assert_(isinstance(session, client.SessionInitializer))
268
269 self.assertFalse(tls.required)
270 self.assertTrue(sasl.required)
271 self.assertFalse(bind.required)
272 self.assertFalse(session.required)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698