| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 from zope.interface import implements | |
| 5 from twisted.internet import defer | |
| 6 from twisted.trial import unittest | |
| 7 from twisted.words.protocols.jabber import sasl, sasl_mechanisms, xmlstream | |
| 8 from twisted.words.xish import domish | |
| 9 | |
| 10 NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl' | |
| 11 | |
| 12 class DummySASLMechanism(object): | |
| 13 """ | |
| 14 Dummy SASL mechanism. | |
| 15 | |
| 16 This just returns the initialResponse passed on creation, stores any | |
| 17 challenges and replies with an empty response. | |
| 18 | |
| 19 @ivar challenge: Last received challenge. | |
| 20 @type challenge: C{unicode}. | |
| 21 @ivar initialResponse: Initial response to be returned when requested | |
| 22 via C{getInitialResponse} or C{None}. | |
| 23 @type initialResponse: C{unicode} | |
| 24 """ | |
| 25 | |
| 26 implements(sasl_mechanisms.ISASLMechanism) | |
| 27 | |
| 28 challenge = None | |
| 29 name = "DUMMY" | |
| 30 | |
| 31 def __init__(self, initialResponse): | |
| 32 self.initialResponse = initialResponse | |
| 33 | |
| 34 def getInitialResponse(self): | |
| 35 return self.initialResponse | |
| 36 | |
| 37 def getResponse(self, challenge): | |
| 38 self.challenge = challenge | |
| 39 return "" | |
| 40 | |
| 41 class DummySASLInitiatingInitializer(sasl.SASLInitiatingInitializer): | |
| 42 """ | |
| 43 Dummy SASL Initializer for initiating entities. | |
| 44 | |
| 45 This hardwires the SASL mechanism to L{DummySASLMechanism}, that is | |
| 46 instantiated with the value of C{initialResponse}. | |
| 47 | |
| 48 @ivar initialResponse: The initial response to be returned by the | |
| 49 dummy SASL mechanism or C{None}. | |
| 50 @type initialResponse: C{unicode}. | |
| 51 """ | |
| 52 | |
| 53 initialResponse = None | |
| 54 | |
| 55 def setMechanism(self): | |
| 56 self.mechanism = DummySASLMechanism(self.initialResponse) | |
| 57 | |
| 58 class SASLInitiatingInitializerTest(unittest.TestCase): | |
| 59 | |
| 60 def setUp(self): | |
| 61 self.output = [] | |
| 62 | |
| 63 self.authenticator = xmlstream.Authenticator() | |
| 64 self.xmlstream = xmlstream.XmlStream(self.authenticator) | |
| 65 self.xmlstream.send = self.output.append | |
| 66 self.xmlstream.connectionMade() | |
| 67 self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' " | |
| 68 "xmlns:stream='http://etherx.jabber.org/streams' " | |
| 69 "from='example.com' id='12345' version='1.0'>") | |
| 70 self.init = DummySASLInitiatingInitializer(self.xmlstream) | |
| 71 | |
| 72 def test_onFailure(self): | |
| 73 """ | |
| 74 Test that the SASL error condition is correctly extracted. | |
| 75 """ | |
| 76 failure = domish.Element(('urn:ietf:params:xml:ns:xmpp-sasl', | |
| 77 'failure')) | |
| 78 failure.addElement('not-authorized') | |
| 79 self.init._deferred = defer.Deferred() | |
| 80 self.init.onFailure(failure) | |
| 81 self.assertFailure(self.init._deferred, sasl.SASLAuthError) | |
| 82 self.init._deferred.addCallback(lambda e: | |
| 83 self.assertEquals('not-authorized', | |
| 84 e.condition)) | |
| 85 return self.init._deferred | |
| 86 | |
| 87 def test_sendAuthInitialResponse(self): | |
| 88 """ | |
| 89 Test starting authentication with an initial response. | |
| 90 """ | |
| 91 self.init.initialResponse = "dummy" | |
| 92 self.init.start() | |
| 93 auth = self.output[0] | |
| 94 self.assertEquals(NS_XMPP_SASL, auth.uri) | |
| 95 self.assertEquals('auth', auth.name) | |
| 96 self.assertEquals('DUMMY', auth['mechanism']) | |
| 97 self.assertEquals('ZHVtbXk=', str(auth)) | |
| 98 | |
| 99 def test_sendAuthNoInitialResponse(self): | |
| 100 """ | |
| 101 Test starting authentication without an initial response. | |
| 102 """ | |
| 103 self.init.initialResponse = None | |
| 104 self.init.start() | |
| 105 auth = self.output[0] | |
| 106 self.assertEquals('', str(auth)) | |
| 107 | |
| 108 def test_sendAuthEmptyInitialResponse(self): | |
| 109 """ | |
| 110 Test starting authentication where the initial response is empty. | |
| 111 """ | |
| 112 self.init.initialResponse = "" | |
| 113 self.init.start() | |
| 114 auth = self.output[0] | |
| 115 self.assertEquals('=', str(auth)) | |
| 116 | |
| 117 def test_onChallenge(self): | |
| 118 """ | |
| 119 Test receiving a challenge message. | |
| 120 """ | |
| 121 d = self.init.start() | |
| 122 challenge = domish.Element((NS_XMPP_SASL, 'challenge')) | |
| 123 challenge.addContent('bXkgY2hhbGxlbmdl') | |
| 124 self.init.onChallenge(challenge) | |
| 125 self.assertEqual('my challenge', self.init.mechanism.challenge) | |
| 126 self.init.onSuccess(None) | |
| 127 return d | |
| 128 | |
| 129 def test_onChallengeEmpty(self): | |
| 130 """ | |
| 131 Test receiving an empty challenge message. | |
| 132 """ | |
| 133 d = self.init.start() | |
| 134 challenge = domish.Element((NS_XMPP_SASL, 'challenge')) | |
| 135 self.init.onChallenge(challenge) | |
| 136 self.assertEqual('', self.init.mechanism.challenge) | |
| 137 self.init.onSuccess(None) | |
| 138 return d | |
| 139 | |
| 140 def test_onChallengeIllegalPadding(self): | |
| 141 """ | |
| 142 Test receiving a challenge message with illegal padding. | |
| 143 """ | |
| 144 d = self.init.start() | |
| 145 challenge = domish.Element((NS_XMPP_SASL, 'challenge')) | |
| 146 challenge.addContent('bXkg=Y2hhbGxlbmdl') | |
| 147 self.init.onChallenge(challenge) | |
| 148 self.assertFailure(d, sasl.SASLIncorrectEncodingError) | |
| 149 return d | |
| 150 | |
| 151 def test_onChallengeIllegalCharacters(self): | |
| 152 """ | |
| 153 Test receiving a challenge message with illegal characters. | |
| 154 """ | |
| 155 d = self.init.start() | |
| 156 challenge = domish.Element((NS_XMPP_SASL, 'challenge')) | |
| 157 challenge.addContent('bXkg*Y2hhbGxlbmdl') | |
| 158 self.init.onChallenge(challenge) | |
| 159 self.assertFailure(d, sasl.SASLIncorrectEncodingError) | |
| 160 return d | |
| 161 | |
| 162 def test_onChallengeMalformed(self): | |
| 163 """ | |
| 164 Test receiving a malformed challenge message. | |
| 165 """ | |
| 166 d = self.init.start() | |
| 167 challenge = domish.Element((NS_XMPP_SASL, 'challenge')) | |
| 168 challenge.addContent('a') | |
| 169 self.init.onChallenge(challenge) | |
| 170 self.assertFailure(d, sasl.SASLIncorrectEncodingError) | |
| 171 return d | |
| OLD | NEW |