OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 from zope.interface import implements | |
5 from twisted.internet import defer | |
6 from twisted.trial import unittest | |
7 from twisted.words.protocols.jabber import sasl, sasl_mechanisms, xmlstream | |
8 from twisted.words.xish import domish | |
9 | |
10 NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl' | |
11 | |
12 class DummySASLMechanism(object): | |
13 """ | |
14 Dummy SASL mechanism. | |
15 | |
16 This just returns the initialResponse passed on creation, stores any | |
17 challenges and replies with an empty response. | |
18 | |
19 @ivar challenge: Last received challenge. | |
20 @type challenge: C{unicode}. | |
21 @ivar initialResponse: Initial response to be returned when requested | |
22 via C{getInitialResponse} or C{None}. | |
23 @type initialResponse: C{unicode} | |
24 """ | |
25 | |
26 implements(sasl_mechanisms.ISASLMechanism) | |
27 | |
28 challenge = None | |
29 name = "DUMMY" | |
30 | |
31 def __init__(self, initialResponse): | |
32 self.initialResponse = initialResponse | |
33 | |
34 def getInitialResponse(self): | |
35 return self.initialResponse | |
36 | |
37 def getResponse(self, challenge): | |
38 self.challenge = challenge | |
39 return "" | |
40 | |
41 class DummySASLInitiatingInitializer(sasl.SASLInitiatingInitializer): | |
42 """ | |
43 Dummy SASL Initializer for initiating entities. | |
44 | |
45 This hardwires the SASL mechanism to L{DummySASLMechanism}, that is | |
46 instantiated with the value of C{initialResponse}. | |
47 | |
48 @ivar initialResponse: The initial response to be returned by the | |
49 dummy SASL mechanism or C{None}. | |
50 @type initialResponse: C{unicode}. | |
51 """ | |
52 | |
53 initialResponse = None | |
54 | |
55 def setMechanism(self): | |
56 self.mechanism = DummySASLMechanism(self.initialResponse) | |
57 | |
58 class SASLInitiatingInitializerTest(unittest.TestCase): | |
59 | |
60 def setUp(self): | |
61 self.output = [] | |
62 | |
63 self.authenticator = xmlstream.Authenticator() | |
64 self.xmlstream = xmlstream.XmlStream(self.authenticator) | |
65 self.xmlstream.send = self.output.append | |
66 self.xmlstream.connectionMade() | |
67 self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' " | |
68 "xmlns:stream='http://etherx.jabber.org/streams' " | |
69 "from='example.com' id='12345' version='1.0'>") | |
70 self.init = DummySASLInitiatingInitializer(self.xmlstream) | |
71 | |
72 def test_onFailure(self): | |
73 """ | |
74 Test that the SASL error condition is correctly extracted. | |
75 """ | |
76 failure = domish.Element(('urn:ietf:params:xml:ns:xmpp-sasl', | |
77 'failure')) | |
78 failure.addElement('not-authorized') | |
79 self.init._deferred = defer.Deferred() | |
80 self.init.onFailure(failure) | |
81 self.assertFailure(self.init._deferred, sasl.SASLAuthError) | |
82 self.init._deferred.addCallback(lambda e: | |
83 self.assertEquals('not-authorized', | |
84 e.condition)) | |
85 return self.init._deferred | |
86 | |
87 def test_sendAuthInitialResponse(self): | |
88 """ | |
89 Test starting authentication with an initial response. | |
90 """ | |
91 self.init.initialResponse = "dummy" | |
92 self.init.start() | |
93 auth = self.output[0] | |
94 self.assertEquals(NS_XMPP_SASL, auth.uri) | |
95 self.assertEquals('auth', auth.name) | |
96 self.assertEquals('DUMMY', auth['mechanism']) | |
97 self.assertEquals('ZHVtbXk=', str(auth)) | |
98 | |
99 def test_sendAuthNoInitialResponse(self): | |
100 """ | |
101 Test starting authentication without an initial response. | |
102 """ | |
103 self.init.initialResponse = None | |
104 self.init.start() | |
105 auth = self.output[0] | |
106 self.assertEquals('', str(auth)) | |
107 | |
108 def test_sendAuthEmptyInitialResponse(self): | |
109 """ | |
110 Test starting authentication where the initial response is empty. | |
111 """ | |
112 self.init.initialResponse = "" | |
113 self.init.start() | |
114 auth = self.output[0] | |
115 self.assertEquals('=', str(auth)) | |
116 | |
117 def test_onChallenge(self): | |
118 """ | |
119 Test receiving a challenge message. | |
120 """ | |
121 d = self.init.start() | |
122 challenge = domish.Element((NS_XMPP_SASL, 'challenge')) | |
123 challenge.addContent('bXkgY2hhbGxlbmdl') | |
124 self.init.onChallenge(challenge) | |
125 self.assertEqual('my challenge', self.init.mechanism.challenge) | |
126 self.init.onSuccess(None) | |
127 return d | |
128 | |
129 def test_onChallengeEmpty(self): | |
130 """ | |
131 Test receiving an empty challenge message. | |
132 """ | |
133 d = self.init.start() | |
134 challenge = domish.Element((NS_XMPP_SASL, 'challenge')) | |
135 self.init.onChallenge(challenge) | |
136 self.assertEqual('', self.init.mechanism.challenge) | |
137 self.init.onSuccess(None) | |
138 return d | |
139 | |
140 def test_onChallengeIllegalPadding(self): | |
141 """ | |
142 Test receiving a challenge message with illegal padding. | |
143 """ | |
144 d = self.init.start() | |
145 challenge = domish.Element((NS_XMPP_SASL, 'challenge')) | |
146 challenge.addContent('bXkg=Y2hhbGxlbmdl') | |
147 self.init.onChallenge(challenge) | |
148 self.assertFailure(d, sasl.SASLIncorrectEncodingError) | |
149 return d | |
150 | |
151 def test_onChallengeIllegalCharacters(self): | |
152 """ | |
153 Test receiving a challenge message with illegal characters. | |
154 """ | |
155 d = self.init.start() | |
156 challenge = domish.Element((NS_XMPP_SASL, 'challenge')) | |
157 challenge.addContent('bXkg*Y2hhbGxlbmdl') | |
158 self.init.onChallenge(challenge) | |
159 self.assertFailure(d, sasl.SASLIncorrectEncodingError) | |
160 return d | |
161 | |
162 def test_onChallengeMalformed(self): | |
163 """ | |
164 Test receiving a malformed challenge message. | |
165 """ | |
166 d = self.init.start() | |
167 challenge = domish.Element((NS_XMPP_SASL, 'challenge')) | |
168 challenge.addContent('a') | |
169 self.init.onChallenge(challenge) | |
170 self.assertFailure(d, sasl.SASLIncorrectEncodingError) | |
171 return d | |
OLD | NEW |