Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(53)

Unified Diff: third_party/twisted_8_1/twisted/words/test/test_jabberxmlstream.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/words/test/test_jabberxmlstream.py
diff --git a/third_party/twisted_8_1/twisted/words/test/test_jabberxmlstream.py b/third_party/twisted_8_1/twisted/words/test/test_jabberxmlstream.py
deleted file mode 100644
index 2313b0d036b9b2aec20092006155015c9b18f1c8..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/words/test/test_jabberxmlstream.py
+++ /dev/null
@@ -1,1174 +0,0 @@
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Tests for L{twisted.words.protocols.jabber.xmlstream}.
-"""
-
-from twisted.trial import unittest
-
-from zope.interface.verify import verifyObject
-
-from twisted.internet import defer, task
-from twisted.internet.error import ConnectionLost
-from twisted.test import proto_helpers
-from twisted.words.xish import domish
-from twisted.words.protocols.jabber import error, ijabber, jid, xmlstream
-
-
-
-NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls'
-
-
-
-class IQTest(unittest.TestCase):
- """
- Tests both IQ and the associated IIQResponseTracker callback.
- """
-
- def setUp(self):
- authenticator = xmlstream.ConnectAuthenticator('otherhost')
- authenticator.namespace = 'testns'
- self.xmlstream = xmlstream.XmlStream(authenticator)
- self.clock = task.Clock()
- self.xmlstream._callLater = self.clock.callLater
- self.xmlstream.makeConnection(proto_helpers.StringTransport())
- self.xmlstream.dataReceived(
- "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
- "xmlns='testns' from='otherhost' version='1.0'>")
- self.iq = xmlstream.IQ(self.xmlstream, 'get')
-
-
- def testBasic(self):
- self.assertEquals(self.iq['type'], 'get')
- self.assertTrue(self.iq['id'])
-
-
- def testSend(self):
- self.xmlstream.transport.clear()
- self.iq.send()
- self.assertEquals("<iq type='get' id='%s'/>" % self.iq['id'],
- self.xmlstream.transport.value())
-
-
- def testResultResponse(self):
- def cb(result):
- self.assertEquals(result['type'], 'result')
-
- d = self.iq.send()
- d.addCallback(cb)
-
- xs = self.xmlstream
- xs.dataReceived("<iq type='result' id='%s'/>" % self.iq['id'])
- return d
-
-
- def testErrorResponse(self):
- d = self.iq.send()
- self.assertFailure(d, error.StanzaError)
-
- xs = self.xmlstream
- xs.dataReceived("<iq type='error' id='%s'/>" % self.iq['id'])
- return d
-
-
- def testNonTrackedResponse(self):
- """
- Test that untracked iq responses don't trigger any action.
-
- Untracked means that the id of the incoming response iq is not
- in the stream's C{iqDeferreds} dictionary.
- """
- xs = self.xmlstream
- xmlstream.upgradeWithIQResponseTracker(xs)
-
- # Make sure we aren't tracking any iq's.
- self.failIf(xs.iqDeferreds)
-
- # Set up a fallback handler that checks the stanza's handled attribute.
- # If that is set to True, the iq tracker claims to have handled the
- # response.
- def cb(iq):
- self.failIf(getattr(iq, 'handled', False))
-
- xs.addObserver("/iq", cb, -1)
-
- # Receive an untracked iq response
- xs.dataReceived("<iq type='result' id='test'/>")
-
-
- def testCleanup(self):
- """
- Test if the deferred associated with an iq request is removed
- from the list kept in the L{XmlStream} object after it has
- been fired.
- """
-
- d = self.iq.send()
- xs = self.xmlstream
- xs.dataReceived("<iq type='result' id='%s'/>" % self.iq['id'])
- self.assertNotIn(self.iq['id'], xs.iqDeferreds)
- return d
-
-
- def testDisconnectCleanup(self):
- """
- Test if deferreds for iq's that haven't yet received a response
- have their errback called on stream disconnect.
- """
-
- d = self.iq.send()
- xs = self.xmlstream
- xs.connectionLost("Closed by peer")
- self.assertFailure(d, ConnectionLost)
- return d
-
-
- def testNoModifyingDict(self):
- """
- Test to make sure the errbacks cannot cause the iteration of the
- iqDeferreds to blow up in our face.
- """
-
- def eb(failure):
- d = xmlstream.IQ(self.xmlstream).send()
- d.addErrback(eb)
-
- d = self.iq.send()
- d.addErrback(eb)
- self.xmlstream.connectionLost("Closed by peer")
- return d
-
-
- def testRequestTimingOut(self):
- """
- Test that an iq request with a defined timeout times out.
- """
- self.iq.timeout = 60
- d = self.iq.send()
- self.assertFailure(d, xmlstream.TimeoutError)
-
- self.clock.pump([1, 60])
- self.failIf(self.clock.calls)
- self.failIf(self.xmlstream.iqDeferreds)
- return d
-
-
- def testRequestNotTimingOut(self):
- """
- Test that an iq request with a defined timeout does not time out
- when a response was received before the timeout period elapsed.
- """
- self.iq.timeout = 60
- d = self.iq.send()
- self.clock.callLater(1, self.xmlstream.dataReceived,
- "<iq type='result' id='%s'/>" % self.iq['id'])
- self.clock.pump([1, 1])
- self.failIf(self.clock.calls)
- return d
-
-
- def testDisconnectTimeoutCancellation(self):
- """
- Test if timeouts for iq's that haven't yet received a response
- are cancelled on stream disconnect.
- """
-
- self.iq.timeout = 60
- d = self.iq.send()
-
- xs = self.xmlstream
- xs.connectionLost("Closed by peer")
- self.assertFailure(d, ConnectionLost)
- self.failIf(self.clock.calls)
- return d
-
-
-
-class XmlStreamTest(unittest.TestCase):
-
- def onStreamStart(self, obj):
- self.gotStreamStart = True
-
-
- def onStreamEnd(self, obj):
- self.gotStreamEnd = True
-
-
- def onStreamError(self, obj):
- self.gotStreamError = True
-
-
- def setUp(self):
- """
- Set up XmlStream and several observers.
- """
- self.gotStreamStart = False
- self.gotStreamEnd = False
- self.gotStreamError = False
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- xs.addObserver('//event/stream/start', self.onStreamStart)
- xs.addObserver('//event/stream/end', self.onStreamEnd)
- xs.addObserver('//event/stream/error', self.onStreamError)
- xs.makeConnection(proto_helpers.StringTransportWithDisconnection())
- xs.transport.protocol = xs
- xs.namespace = 'testns'
- xs.version = (1, 0)
- self.xmlstream = xs
-
-
- def test_sendHeaderBasic(self):
- """
- Basic test on the header sent by sendHeader.
- """
- xs = self.xmlstream
- xs.sendHeader()
- splitHeader = self.xmlstream.transport.value()[0:-1].split(' ')
- self.assertIn("<stream:stream", splitHeader)
- self.assertIn("xmlns:stream='http://etherx.jabber.org/streams'",
- splitHeader)
- self.assertIn("xmlns='testns'", splitHeader)
- self.assertIn("version='1.0'", splitHeader)
- self.assertTrue(xs._headerSent)
-
-
- def test_sendHeaderAdditionalNamespaces(self):
- """
- Test for additional namespace declarations.
- """
- xs = self.xmlstream
- xs.prefixes['jabber:server:dialback'] = 'db'
- xs.sendHeader()
- splitHeader = self.xmlstream.transport.value()[0:-1].split(' ')
- self.assertIn("<stream:stream", splitHeader)
- self.assertIn("xmlns:stream='http://etherx.jabber.org/streams'",
- splitHeader)
- self.assertIn("xmlns:db='jabber:server:dialback'", splitHeader)
- self.assertIn("xmlns='testns'", splitHeader)
- self.assertIn("version='1.0'", splitHeader)
- self.assertTrue(xs._headerSent)
-
-
- def test_sendHeaderInitiating(self):
- """
- Test addressing when initiating a stream.
- """
- xs = self.xmlstream
- xs.thisEntity = jid.JID('thisHost')
- xs.otherEntity = jid.JID('otherHost')
- xs.initiating = True
- xs.sendHeader()
- splitHeader = xs.transport.value()[0:-1].split(' ')
- self.assertIn("to='otherhost'", splitHeader)
- self.assertIn("from='thishost'", splitHeader)
-
-
- def test_sendHeaderReceiving(self):
- """
- Test addressing when receiving a stream.
- """
- xs = self.xmlstream
- xs.thisEntity = jid.JID('thisHost')
- xs.otherEntity = jid.JID('otherHost')
- xs.initiating = False
- xs.sid = 'session01'
- xs.sendHeader()
- splitHeader = xs.transport.value()[0:-1].split(' ')
- self.assertIn("to='otherhost'", splitHeader)
- self.assertIn("from='thishost'", splitHeader)
- self.assertIn("id='session01'", splitHeader)
-
-
- def test_receiveStreamError(self):
- """
- Test events when a stream error is received.
- """
- xs = self.xmlstream
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345' version='1.0'>")
- xs.dataReceived("<stream:error/>")
- self.assertTrue(self.gotStreamError)
- self.assertTrue(self.gotStreamEnd)
-
-
- def test_sendStreamErrorInitiating(self):
- """
- Test sendStreamError on an initiating xmlstream with a header sent.
-
- An error should be sent out and the connection lost.
- """
- xs = self.xmlstream
- xs.initiating = True
- xs.sendHeader()
- xs.transport.clear()
- xs.sendStreamError(error.StreamError('version-unsupported'))
- self.assertNotEqual('', xs.transport.value())
- self.assertTrue(self.gotStreamEnd)
-
-
- def test_sendStreamErrorInitiatingNoHeader(self):
- """
- Test sendStreamError on an initiating xmlstream without having sent a
- header.
-
- In this case, no header should be generated. Also, the error should
- not be sent out on the stream. Just closing the connection.
- """
- xs = self.xmlstream
- xs.initiating = True
- xs.transport.clear()
- xs.sendStreamError(error.StreamError('version-unsupported'))
- self.assertNot(xs._headerSent)
- self.assertEqual('', xs.transport.value())
- self.assertTrue(self.gotStreamEnd)
-
-
- def test_sendStreamErrorReceiving(self):
- """
- Test sendStreamError on a receiving xmlstream with a header sent.
-
- An error should be sent out and the connection lost.
- """
- xs = self.xmlstream
- xs.initiating = False
- xs.sendHeader()
- xs.transport.clear()
- xs.sendStreamError(error.StreamError('version-unsupported'))
- self.assertNotEqual('', xs.transport.value())
- self.assertTrue(self.gotStreamEnd)
-
-
- def test_sendStreamErrorReceivingNoHeader(self):
- """
- Test sendStreamError on a receiving xmlstream without having sent a
- header.
-
- In this case, a header should be generated. Then, the error should
- be sent out on the stream followed by closing the connection.
- """
- xs = self.xmlstream
- xs.initiating = False
- xs.transport.clear()
- xs.sendStreamError(error.StreamError('version-unsupported'))
- self.assertTrue(xs._headerSent)
- self.assertNotEqual('', xs.transport.value())
- self.assertTrue(self.gotStreamEnd)
-
-
- def test_reset(self):
- """
- Test resetting the XML stream to start a new layer.
- """
- xs = self.xmlstream
- xs.sendHeader()
- stream = xs.stream
- xs.reset()
- self.assertNotEqual(stream, xs.stream)
- self.assertNot(xs._headerSent)
-
-
- def test_send(self):
- """
- Test send with various types of objects.
- """
- xs = self.xmlstream
- xs.send('<presence/>')
- self.assertEqual(xs.transport.value(), '<presence/>')
-
- xs.transport.clear()
- el = domish.Element(('testns', 'presence'))
- xs.send(el)
- self.assertEqual(xs.transport.value(), '<presence/>')
-
- xs.transport.clear()
- el = domish.Element(('http://etherx.jabber.org/streams', 'features'))
- xs.send(el)
- self.assertEqual(xs.transport.value(), '<stream:features/>')
-
-
- def test_authenticator(self):
- """
- Test that the associated authenticator is correctly called.
- """
- connectionMadeCalls = []
- streamStartedCalls = []
- associateWithStreamCalls = []
-
- class TestAuthenticator:
- def connectionMade(self):
- connectionMadeCalls.append(None)
-
- def streamStarted(self, rootElement):
- streamStartedCalls.append(rootElement)
-
- def associateWithStream(self, xs):
- associateWithStreamCalls.append(xs)
-
- a = TestAuthenticator()
- xs = xmlstream.XmlStream(a)
- self.assertEqual([xs], associateWithStreamCalls)
- xs.connectionMade()
- self.assertEqual([None], connectionMadeCalls)
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345'>")
- self.assertEqual(1, len(streamStartedCalls))
- xs.reset()
- self.assertEqual([None], connectionMadeCalls)
-
-
-
-class TestError(Exception):
- pass
-
-
-
-class AuthenticatorTest(unittest.TestCase):
- def setUp(self):
- self.authenticator = xmlstream.ListenAuthenticator()
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
-
-
- def test_streamStart(self):
- """
- Test streamStart to fill the appropriate attributes from the
- stream header.
- """
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.org' to='example.com' id='12345' "
- "version='1.0'>")
- self.assertEqual((1, 0), xs.version)
- self.assertIdentical(None, xs.sid)
- self.assertEqual('jabber:client', xs.namespace)
- self.assertIdentical(None, xs.otherEntity)
- self.assertEqual('example.com', xs.thisEntity.host)
-
-
- def test_streamStartLegacy(self):
- """
- Test streamStart to fill the appropriate attributes from the
- stream header for a pre-XMPP-1.0 header.
- """
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345'>")
- self.assertEqual((0, 0), xs.version)
-
-
- def test_streamBadVersionOneDigit(self):
- """
- Test streamStart to fill the appropriate attributes from the
- stream header for a version with only one digit.
- """
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345' version='1'>")
- self.assertEqual((0, 0), xs.version)
-
-
- def test_streamBadVersionNoNumber(self):
- """
- Test streamStart to fill the appropriate attributes from the
- stream header for a malformed version.
- """
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345' version='blah'>")
- self.assertEqual((0, 0), xs.version)
-
-
-
-class ConnectAuthenticatorTest(unittest.TestCase):
-
- def setUp(self):
- self.gotAuthenticated = False
- self.initFailure = None
- self.authenticator = xmlstream.ConnectAuthenticator('otherHost')
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
- self.xmlstream.addObserver('//event/stream/authd', self.onAuthenticated)
- self.xmlstream.addObserver('//event/xmpp/initfailed', self.onInitFailed)
-
-
- def onAuthenticated(self, obj):
- self.gotAuthenticated = True
-
-
- def onInitFailed(self, failure):
- self.initFailure = failure
-
-
- def testSucces(self):
- """
- Test successful completion of an initialization step.
- """
- class Initializer:
- def initialize(self):
- pass
-
- init = Initializer()
- self.xmlstream.initializers = [init]
-
- self.authenticator.initializeStream()
- self.assertEqual([], self.xmlstream.initializers)
- self.assertTrue(self.gotAuthenticated)
-
-
- def testFailure(self):
- """
- Test failure of an initialization step.
- """
- class Initializer:
- def initialize(self):
- raise TestError
-
- init = Initializer()
- self.xmlstream.initializers = [init]
-
- self.authenticator.initializeStream()
- self.assertEqual([init], self.xmlstream.initializers)
- self.assertFalse(self.gotAuthenticated)
- self.assertNotIdentical(None, self.initFailure)
- self.assertTrue(self.initFailure.check(TestError))
-
-
- def test_streamStart(self):
- """
- Test streamStart to fill the appropriate attributes from the
- stream header.
- """
- self.authenticator.namespace = 'testns'
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' to='example.org' id='12345' "
- "version='1.0'>")
- self.assertEqual((1, 0), xs.version)
- self.assertEqual('12345', xs.sid)
- self.assertEqual('testns', xs.namespace)
- self.assertEqual('example.com', xs.otherEntity.host)
- self.assertIdentical(None, xs.thisEntity)
- self.assertNot(self.gotAuthenticated)
- xs.dataReceived("<stream:features>"
- "<test xmlns='testns'/>"
- "</stream:features>")
- self.assertIn(('testns', 'test'), xs.features)
- self.assertTrue(self.gotAuthenticated)
-
-
-
-class ListenAuthenticatorTest(unittest.TestCase):
- def setUp(self):
- self.authenticator = xmlstream.ListenAuthenticator()
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
-
-
- def test_streamStart(self):
- """
- Test streamStart to fill the appropriate attributes from the
- stream header.
- """
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.org' to='example.com' id='12345' "
- "version='1.0'>")
- self.assertEqual((1, 0), xs.version)
- self.assertIdentical(None, xs.sid)
- self.assertEqual('jabber:client', xs.namespace)
- self.assertIdentical(None, xs.otherEntity)
- self.assertEqual('example.com', xs.thisEntity.host)
-
-
-
-class TLSInitiatingInitializerTest(unittest.TestCase):
- def setUp(self):
- self.output = []
- self.done = []
-
- self.savedSSL = xmlstream.ssl
-
- self.authenticator = xmlstream.Authenticator()
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
- self.xmlstream.send = self.output.append
- self.xmlstream.connectionMade()
- self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345' version='1.0'>")
- self.init = xmlstream.TLSInitiatingInitializer(self.xmlstream)
-
-
- def tearDown(self):
- xmlstream.ssl = self.savedSSL
-
-
- def testWantedSupported(self):
- """
- Test start when TLS is wanted and the SSL library available.
- """
- self.xmlstream.transport = proto_helpers.StringTransport()
- self.xmlstream.transport.startTLS = lambda ctx: self.done.append('TLS')
- self.xmlstream.reset = lambda: self.done.append('reset')
- self.xmlstream.sendHeader = lambda: self.done.append('header')
-
- d = self.init.start()
- d.addCallback(self.assertEquals, xmlstream.Reset)
- starttls = self.output[0]
- self.assertEquals('starttls', starttls.name)
- self.assertEquals(NS_XMPP_TLS, starttls.uri)
- self.xmlstream.dataReceived("<proceed xmlns='%s'/>" % NS_XMPP_TLS)
- self.assertEquals(['TLS', 'reset', 'header'], self.done)
-
- return d
-
- if not xmlstream.ssl:
- testWantedSupported.skip = "SSL not available"
-
-
- def testWantedNotSupportedNotRequired(self):
- """
- Test start when TLS is wanted and the SSL library available.
- """
- xmlstream.ssl = None
-
- d = self.init.start()
- d.addCallback(self.assertEquals, None)
- self.assertEquals([], self.output)
-
- return d
-
-
- def testWantedNotSupportedRequired(self):
- """
- Test start when TLS is wanted and the SSL library available.
- """
- xmlstream.ssl = None
- self.init.required = True
-
- d = self.init.start()
- self.assertFailure(d, xmlstream.TLSNotSupported)
- self.assertEquals([], self.output)
-
- return d
-
-
- def testNotWantedRequired(self):
- """
- Test start when TLS is not wanted, but required by the server.
- """
- tls = domish.Element(('urn:ietf:params:xml:ns:xmpp-tls', 'starttls'))
- tls.addElement('required')
- self.xmlstream.features = {(tls.uri, tls.name): tls}
- self.init.wanted = False
-
- d = self.init.start()
- self.assertEquals([], self.output)
- self.assertFailure(d, xmlstream.TLSRequired)
-
- return d
-
-
- def testNotWantedNotRequired(self):
- """
- Test start when TLS is not wanted, but required by the server.
- """
- tls = domish.Element(('urn:ietf:params:xml:ns:xmpp-tls', 'starttls'))
- self.xmlstream.features = {(tls.uri, tls.name): tls}
- self.init.wanted = False
-
- d = self.init.start()
- d.addCallback(self.assertEqual, None)
- self.assertEquals([], self.output)
- return d
-
-
- def testFailed(self):
- """
- Test failed TLS negotiation.
- """
- # Pretend that ssl is supported, it isn't actually used when the
- # server starts out with a failure in response to our initial
- # C{starttls} stanza.
- xmlstream.ssl = 1
-
- d = self.init.start()
- self.assertFailure(d, xmlstream.TLSFailed)
- self.xmlstream.dataReceived("<failure xmlns='%s'/>" % NS_XMPP_TLS)
- return d
-
-
-
-class TestFeatureInitializer(xmlstream.BaseFeatureInitiatingInitializer):
- feature = ('testns', 'test')
-
- def start(self):
- return defer.succeed(None)
-
-
-
-class BaseFeatureInitiatingInitializerTest(unittest.TestCase):
-
- def setUp(self):
- self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator())
- self.init = TestFeatureInitializer(self.xmlstream)
-
-
- def testAdvertized(self):
- """
- Test that an advertized feature results in successful initialization.
- """
- self.xmlstream.features = {self.init.feature:
- domish.Element(self.init.feature)}
- return self.init.initialize()
-
-
- def testNotAdvertizedRequired(self):
- """
- Test that when the feature is not advertized, but required by the
- initializer, an exception is raised.
- """
- self.init.required = True
- self.assertRaises(xmlstream.FeatureNotAdvertized, self.init.initialize)
-
-
- def testNotAdvertizedNotRequired(self):
- """
- Test that when the feature is not advertized, and not required by the
- initializer, the initializer silently succeeds.
- """
- self.init.required = False
- self.assertIdentical(None, self.init.initialize())
-
-
-
-class ToResponseTest(unittest.TestCase):
-
- def test_toResponse(self):
- """
- Test that a response stanza is generated with addressing swapped.
- """
- stanza = domish.Element(('jabber:client', 'iq'))
- stanza['type'] = 'get'
- stanza['to'] = 'user1@example.com'
- stanza['from'] = 'user2@example.com/resource'
- stanza['id'] = 'stanza1'
- response = xmlstream.toResponse(stanza, 'result')
- self.assertNotIdentical(stanza, response)
- self.assertEqual(response['from'], 'user1@example.com')
- self.assertEqual(response['to'], 'user2@example.com/resource')
- self.assertEqual(response['type'], 'result')
- self.assertEqual(response['id'], 'stanza1')
-
-
- def test_toResponseNoFrom(self):
- """
- Test that a response is generated from a stanza without a from address.
- """
- stanza = domish.Element(('jabber:client', 'iq'))
- stanza['type'] = 'get'
- stanza['to'] = 'user1@example.com'
- response = xmlstream.toResponse(stanza)
- self.assertEqual(response['from'], 'user1@example.com')
- self.failIf(response.hasAttribute('to'))
-
-
- def test_toResponseNoTo(self):
- """
- Test that a response is generated from a stanza without a to address.
- """
- stanza = domish.Element(('jabber:client', 'iq'))
- stanza['type'] = 'get'
- stanza['from'] = 'user2@example.com/resource'
- response = xmlstream.toResponse(stanza)
- self.failIf(response.hasAttribute('from'))
- self.assertEqual(response['to'], 'user2@example.com/resource')
-
-
- def test_toResponseNoAddressing(self):
- """
- Test that a response is generated from a stanza without any addressing.
- """
- stanza = domish.Element(('jabber:client', 'message'))
- stanza['type'] = 'chat'
- response = xmlstream.toResponse(stanza)
- self.failIf(response.hasAttribute('to'))
- self.failIf(response.hasAttribute('from'))
-
-
- def test_noID(self):
- """
- Test that a proper response is generated without id attribute.
- """
- stanza = domish.Element(('jabber:client', 'message'))
- response = xmlstream.toResponse(stanza)
- self.failIf(response.hasAttribute('id'))
-
-
-
-class DummyFactory(object):
- """
- Dummy XmlStream factory that only registers bootstrap observers.
- """
- def __init__(self):
- self.callbacks = {}
-
-
- def addBootstrap(self, event, callback):
- self.callbacks[event] = callback
-
-
-
-class DummyXMPPHandler(xmlstream.XMPPHandler):
- """
- Dummy XMPP subprotocol handler to count the methods are called on it.
- """
- def __init__(self):
- self.doneMade = 0
- self.doneInitialized = 0
- self.doneLost = 0
-
-
- def makeConnection(self, xs):
- self.connectionMade()
-
-
- def connectionMade(self):
- self.doneMade += 1
-
-
- def connectionInitialized(self):
- self.doneInitialized += 1
-
-
- def connectionLost(self, reason):
- self.doneLost += 1
-
-
-
-class XMPPHandlerTest(unittest.TestCase):
- """
- Tests for L{xmlstream.XMPPHandler}.
- """
-
- def test_interface(self):
- """
- L{xmlstream.XMPPHandler} implements L{ijabber.IXMPPHandler}.
- """
- verifyObject(ijabber.IXMPPHandler, xmlstream.XMPPHandler())
-
-
- def test_send(self):
- """
- Test that data is passed on for sending by the stream manager.
- """
- class DummyStreamManager(object):
- def __init__(self):
- self.outlist = []
-
- def send(self, data):
- self.outlist.append(data)
-
- handler = xmlstream.XMPPHandler()
- handler.parent = DummyStreamManager()
- handler.send('<presence/>')
- self.assertEquals(['<presence/>'], handler.parent.outlist)
-
-
- def test_makeConnection(self):
- """
- Test that makeConnection saves the XML stream and calls connectionMade.
- """
- class TestXMPPHandler(xmlstream.XMPPHandler):
- def connectionMade(self):
- self.doneMade = True
-
- handler = TestXMPPHandler()
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- handler.makeConnection(xs)
- self.assertTrue(handler.doneMade)
- self.assertIdentical(xs, handler.xmlstream)
-
-
- def test_connectionLost(self):
- """
- Test that connectionLost forgets the XML stream.
- """
- handler = xmlstream.XMPPHandler()
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- handler.makeConnection(xs)
- handler.connectionLost(Exception())
- self.assertIdentical(None, handler.xmlstream)
-
-
-
-class XMPPHandlerCollectionTest(unittest.TestCase):
- """
- Tests for L{xmlstream.XMPPHandlerCollection}.
- """
-
- def setUp(self):
- self.collection = xmlstream.XMPPHandlerCollection()
-
-
- def test_interface(self):
- """
- L{xmlstream.StreamManager} implements L{ijabber.IXMPPHandlerCollection}.
- """
- verifyObject(ijabber.IXMPPHandlerCollection, self.collection)
-
-
- def test_addHandler(self):
- """
- Test the addition of a protocol handler.
- """
- handler = DummyXMPPHandler()
- handler.setHandlerParent(self.collection)
- self.assertIn(handler, self.collection)
- self.assertIdentical(self.collection, handler.parent)
-
-
- def test_removeHandler(self):
- """
- Test removal of a protocol handler.
- """
- handler = DummyXMPPHandler()
- handler.setHandlerParent(self.collection)
- handler.disownHandlerParent(self.collection)
- self.assertNotIn(handler, self.collection)
- self.assertIdentical(None, handler.parent)
-
-
-
-class StreamManagerTest(unittest.TestCase):
- """
- Tests for L{xmlstream.StreamManager}.
- """
-
- def setUp(self):
- factory = DummyFactory()
- self.streamManager = xmlstream.StreamManager(factory)
-
-
- def test_basic(self):
- """
- Test correct initialization and setup of factory observers.
- """
- sm = self.streamManager
- self.assertIdentical(None, sm.xmlstream)
- self.assertEquals([], sm.handlers)
- self.assertEquals(sm._connected,
- sm.factory.callbacks['//event/stream/connected'])
- self.assertEquals(sm._authd,
- sm.factory.callbacks['//event/stream/authd'])
- self.assertEquals(sm._disconnected,
- sm.factory.callbacks['//event/stream/end'])
- self.assertEquals(sm.initializationFailed,
- sm.factory.callbacks['//event/xmpp/initfailed'])
-
-
- def test_connected(self):
- """
- Test that protocol handlers have their connectionMade method called
- when the XML stream is connected.
- """
- sm = self.streamManager
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._connected(xs)
- self.assertEquals(1, handler.doneMade)
- self.assertEquals(0, handler.doneInitialized)
- self.assertEquals(0, handler.doneLost)
-
-
- def test_connectedLogTrafficFalse(self):
- """
- Test raw data functions unset when logTraffic is set to False.
- """
- sm = self.streamManager
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._connected(xs)
- self.assertIdentical(None, xs.rawDataInFn)
- self.assertIdentical(None, xs.rawDataOutFn)
-
-
- def test_connectedLogTrafficTrue(self):
- """
- Test raw data functions set when logTraffic is set to True.
- """
- sm = self.streamManager
- sm.logTraffic = True
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._connected(xs)
- self.assertNotIdentical(None, xs.rawDataInFn)
- self.assertNotIdentical(None, xs.rawDataOutFn)
-
-
- def test_authd(self):
- """
- Test that protocol handlers have their connectionInitialized method
- called when the XML stream is initialized.
- """
- sm = self.streamManager
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._authd(xs)
- self.assertEquals(0, handler.doneMade)
- self.assertEquals(1, handler.doneInitialized)
- self.assertEquals(0, handler.doneLost)
-
-
- def test_disconnected(self):
- """
- Test that protocol handlers have their connectionLost method
- called when the XML stream is disconnected.
- """
- sm = self.streamManager
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._disconnected(xs)
- self.assertEquals(0, handler.doneMade)
- self.assertEquals(0, handler.doneInitialized)
- self.assertEquals(1, handler.doneLost)
-
-
- def test_addHandler(self):
- """
- Test the addition of a protocol handler while not connected.
- """
- sm = self.streamManager
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
-
- self.assertEquals(0, handler.doneMade)
- self.assertEquals(0, handler.doneInitialized)
- self.assertEquals(0, handler.doneLost)
-
-
- def test_addHandlerInitialized(self):
- """
- Test the addition of a protocol handler after the stream
- have been initialized.
-
- Make sure that the handler will have the connected stream
- passed via C{makeConnection} and have C{connectionInitialized}
- called.
- """
- sm = self.streamManager
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._connected(xs)
- sm._authd(xs)
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
-
- self.assertEquals(1, handler.doneMade)
- self.assertEquals(1, handler.doneInitialized)
- self.assertEquals(0, handler.doneLost)
-
-
- def test_sendInitialized(self):
- """
- Test send when the stream has been initialized.
-
- The data should be sent directly over the XML stream.
- """
- factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
- sm = xmlstream.StreamManager(factory)
- xs = factory.buildProtocol(None)
- xs.transport = proto_helpers.StringTransport()
- xs.connectionMade()
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345'>")
- xs.dispatch(xs, "//event/stream/authd")
- sm.send("<presence/>")
- self.assertEquals("<presence/>", xs.transport.value())
-
-
- def test_sendNotConnected(self):
- """
- Test send when there is no established XML stream.
-
- The data should be cached until an XML stream has been established and
- initialized.
- """
- factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
- sm = xmlstream.StreamManager(factory)
- handler = DummyXMPPHandler()
- sm.addHandler(handler)
-
- xs = factory.buildProtocol(None)
- xs.transport = proto_helpers.StringTransport()
- sm.send("<presence/>")
- self.assertEquals("", xs.transport.value())
- self.assertEquals("<presence/>", sm._packetQueue[0])
-
- xs.connectionMade()
- self.assertEquals("", xs.transport.value())
- self.assertEquals("<presence/>", sm._packetQueue[0])
-
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345'>")
- xs.dispatch(xs, "//event/stream/authd")
-
- self.assertEquals("<presence/>", xs.transport.value())
- self.failIf(sm._packetQueue)
-
-
- def test_sendNotInitialized(self):
- """
- Test send when the stream is connected but not yet initialized.
-
- The data should be cached until the XML stream has been initialized.
- """
- factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
- sm = xmlstream.StreamManager(factory)
- xs = factory.buildProtocol(None)
- xs.transport = proto_helpers.StringTransport()
- xs.connectionMade()
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345'>")
- sm.send("<presence/>")
- self.assertEquals("", xs.transport.value())
- self.assertEquals("<presence/>", sm._packetQueue[0])
-
-
- def test_sendDisconnected(self):
- """
- Test send after XML stream disconnection.
-
- The data should be cached until a new XML stream has been established
- and initialized.
- """
- factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
- sm = xmlstream.StreamManager(factory)
- handler = DummyXMPPHandler()
- sm.addHandler(handler)
-
- xs = factory.buildProtocol(None)
- xs.connectionMade()
- xs.transport = proto_helpers.StringTransport()
- xs.connectionLost(None)
-
- sm.send("<presence/>")
- self.assertEquals("", xs.transport.value())
- self.assertEquals("<presence/>", sm._packetQueue[0])

Powered by Google App Engine
This is Rietveld 408576698