Index: sync/tools/testserver/xmppserver_test.py |
diff --git a/sync/tools/testserver/xmppserver_test.py b/sync/tools/testserver/xmppserver_test.py |
deleted file mode 100755 |
index 1a539d1af99e771cfa71438a51d0f5ad3e9a660e..0000000000000000000000000000000000000000 |
--- a/sync/tools/testserver/xmppserver_test.py |
+++ /dev/null |
@@ -1,421 +0,0 @@ |
-#!/usr/bin/env python |
-# Copyright 2013 The Chromium Authors. All rights reserved. |
-# Use of this source code is governed by a BSD-style license that can be |
-# found in the LICENSE file. |
- |
-"""Tests exercising the various classes in xmppserver.py.""" |
- |
-import unittest |
- |
-import base64 |
-import xmppserver |
- |
-class XmlUtilsTest(unittest.TestCase): |
- |
- def testParseXml(self): |
- xml_text = """<foo xmlns=""><bar xmlns=""><baz/></bar></foo>""" |
- xml = xmppserver.ParseXml(xml_text) |
- self.assertEqual(xml.toxml(), xml_text) |
- |
- def testCloneXml(self): |
- xml = xmppserver.ParseXml('<foo/>') |
- xml_clone = xmppserver.CloneXml(xml) |
- xml_clone.setAttribute('bar', 'baz') |
- self.assertEqual(xml, xml) |
- self.assertEqual(xml_clone, xml_clone) |
- self.assertNotEqual(xml, xml_clone) |
- |
- def testCloneXmlUnlink(self): |
- xml_text = '<foo/>' |
- xml = xmppserver.ParseXml(xml_text) |
- xml_clone = xmppserver.CloneXml(xml) |
- xml.unlink() |
- self.assertEqual(xml.parentNode, None) |
- self.assertNotEqual(xml_clone.parentNode, None) |
- self.assertEqual(xml_clone.toxml(), xml_text) |
- |
-class StanzaParserTest(unittest.TestCase): |
- |
- def setUp(self): |
- self.stanzas = [] |
- |
- def FeedStanza(self, stanza): |
- # We can't append stanza directly because it is unlinked after |
- # this callback. |
- self.stanzas.append(stanza.toxml()) |
- |
- def testBasic(self): |
- parser = xmppserver.StanzaParser(self) |
- parser.FeedString('<foo') |
- self.assertEqual(len(self.stanzas), 0) |
- parser.FeedString('/><bar></bar>') |
- self.assertEqual(self.stanzas[0], '<foo/>') |
- self.assertEqual(self.stanzas[1], '<bar/>') |
- |
- def testStream(self): |
- parser = xmppserver.StanzaParser(self) |
- parser.FeedString('<stream') |
- self.assertEqual(len(self.stanzas), 0) |
- parser.FeedString(':stream foo="bar" xmlns:stream="baz">') |
- self.assertEqual(self.stanzas[0], |
- '<stream:stream foo="bar" xmlns:stream="baz"/>') |
- |
- def testNested(self): |
- parser = xmppserver.StanzaParser(self) |
- parser.FeedString('<foo') |
- self.assertEqual(len(self.stanzas), 0) |
- parser.FeedString(' bar="baz"') |
- parser.FeedString('><baz/><blah>meh</blah></foo>') |
- self.assertEqual(self.stanzas[0], |
- '<foo bar="baz"><baz/><blah>meh</blah></foo>') |
- |
- |
-class JidTest(unittest.TestCase): |
- |
- def testBasic(self): |
- jid = xmppserver.Jid('foo', 'bar.com') |
- self.assertEqual(str(jid), 'foo@bar.com') |
- |
- def testResource(self): |
- jid = xmppserver.Jid('foo', 'bar.com', 'resource') |
- self.assertEqual(str(jid), 'foo@bar.com/resource') |
- |
- def testGetBareJid(self): |
- jid = xmppserver.Jid('foo', 'bar.com', 'resource') |
- self.assertEqual(str(jid.GetBareJid()), 'foo@bar.com') |
- |
- |
-class IdGeneratorTest(unittest.TestCase): |
- |
- def testBasic(self): |
- id_generator = xmppserver.IdGenerator('foo') |
- for i in xrange(0, 100): |
- self.assertEqual('foo.%d' % i, id_generator.GetNextId()) |
- |
- |
-class HandshakeTaskTest(unittest.TestCase): |
- |
- def setUp(self): |
- self.Reset() |
- |
- def Reset(self): |
- self.data_received = 0 |
- self.handshake_done = False |
- self.jid = None |
- |
- def SendData(self, _): |
- self.data_received += 1 |
- |
- def SendStanza(self, _, unused=True): |
- self.data_received += 1 |
- |
- def HandshakeDone(self, jid): |
- self.handshake_done = True |
- self.jid = jid |
- |
- def DoHandshake(self, resource_prefix, resource, username, |
- initial_stream_domain, auth_domain, auth_stream_domain): |
- self.Reset() |
- handshake_task = ( |
- xmppserver.HandshakeTask(self, resource_prefix, True)) |
- stream_xml = xmppserver.ParseXml('<stream:stream xmlns:stream="foo"/>') |
- stream_xml.setAttribute('to', initial_stream_domain) |
- self.assertEqual(self.data_received, 0) |
- handshake_task.FeedStanza(stream_xml) |
- self.assertEqual(self.data_received, 2) |
- |
- if auth_domain: |
- username_domain = '%s@%s' % (username, auth_domain) |
- else: |
- username_domain = username |
- auth_string = base64.b64encode('\0%s\0bar' % username_domain) |
- auth_xml = xmppserver.ParseXml('<auth>%s</auth>'% auth_string) |
- handshake_task.FeedStanza(auth_xml) |
- self.assertEqual(self.data_received, 3) |
- |
- stream_xml = xmppserver.ParseXml('<stream:stream xmlns:stream="foo"/>') |
- stream_xml.setAttribute('to', auth_stream_domain) |
- handshake_task.FeedStanza(stream_xml) |
- self.assertEqual(self.data_received, 5) |
- |
- bind_xml = xmppserver.ParseXml( |
- '<iq type="set"><bind><resource>%s</resource></bind></iq>' % resource) |
- handshake_task.FeedStanza(bind_xml) |
- self.assertEqual(self.data_received, 6) |
- |
- self.assertFalse(self.handshake_done) |
- |
- session_xml = xmppserver.ParseXml( |
- '<iq type="set"><session></session></iq>') |
- handshake_task.FeedStanza(session_xml) |
- self.assertEqual(self.data_received, 7) |
- |
- self.assertTrue(self.handshake_done) |
- |
- self.assertEqual(self.jid.username, username) |
- self.assertEqual(self.jid.domain, |
- auth_stream_domain or auth_domain or |
- initial_stream_domain) |
- self.assertEqual(self.jid.resource, |
- '%s.%s' % (resource_prefix, resource)) |
- |
- handshake_task.FeedStanza('<ignored/>') |
- self.assertEqual(self.data_received, 7) |
- |
- def DoHandshakeUnauthenticated(self, resource_prefix, resource, username, |
- initial_stream_domain): |
- self.Reset() |
- handshake_task = ( |
- xmppserver.HandshakeTask(self, resource_prefix, False)) |
- stream_xml = xmppserver.ParseXml('<stream:stream xmlns:stream="foo"/>') |
- stream_xml.setAttribute('to', initial_stream_domain) |
- self.assertEqual(self.data_received, 0) |
- handshake_task.FeedStanza(stream_xml) |
- self.assertEqual(self.data_received, 2) |
- |
- self.assertFalse(self.handshake_done) |
- |
- auth_string = base64.b64encode('\0%s\0bar' % username) |
- auth_xml = xmppserver.ParseXml('<auth>%s</auth>'% auth_string) |
- handshake_task.FeedStanza(auth_xml) |
- self.assertEqual(self.data_received, 3) |
- |
- self.assertTrue(self.handshake_done) |
- |
- self.assertEqual(self.jid, None) |
- |
- handshake_task.FeedStanza('<ignored/>') |
- self.assertEqual(self.data_received, 3) |
- |
- def testBasic(self): |
- self.DoHandshake('resource_prefix', 'resource', |
- 'foo', 'bar.com', 'baz.com', 'quux.com') |
- |
- def testDomainBehavior(self): |
- self.DoHandshake('resource_prefix', 'resource', |
- 'foo', 'bar.com', 'baz.com', 'quux.com') |
- self.DoHandshake('resource_prefix', 'resource', |
- 'foo', 'bar.com', 'baz.com', '') |
- self.DoHandshake('resource_prefix', 'resource', |
- 'foo', 'bar.com', '', '') |
- self.DoHandshake('resource_prefix', 'resource', |
- 'foo', '', '', '') |
- |
- def testBasicUnauthenticated(self): |
- self.DoHandshakeUnauthenticated('resource_prefix', 'resource', |
- 'foo', 'bar.com') |
- |
- |
-class FakeSocket(object): |
- """A fake socket object used for testing. |
- """ |
- |
- def __init__(self): |
- self._sent_data = [] |
- |
- def GetSentData(self): |
- return self._sent_data |
- |
- # socket-like methods. |
- def fileno(self): |
- return 0 |
- |
- def setblocking(self, int): |
- pass |
- |
- def getpeername(self): |
- return ('', 0) |
- |
- def send(self, data): |
- self._sent_data.append(data) |
- pass |
- |
- def close(self): |
- pass |
- |
- |
-class XmppConnectionTest(unittest.TestCase): |
- |
- def setUp(self): |
- self.connections = set() |
- self.fake_socket = FakeSocket() |
- |
- # XmppConnection delegate methods. |
- def OnXmppHandshakeDone(self, xmpp_connection): |
- self.connections.add(xmpp_connection) |
- |
- def OnXmppConnectionClosed(self, xmpp_connection): |
- self.connections.discard(xmpp_connection) |
- |
- def ForwardNotification(self, unused_xmpp_connection, notification_stanza): |
- for connection in self.connections: |
- connection.ForwardNotification(notification_stanza) |
- |
- def testBasic(self): |
- socket_map = {} |
- xmpp_connection = xmppserver.XmppConnection( |
- self.fake_socket, socket_map, self, ('', 0), True) |
- self.assertEqual(len(socket_map), 1) |
- self.assertEqual(len(self.connections), 0) |
- xmpp_connection.HandshakeDone(xmppserver.Jid('foo', 'bar')) |
- self.assertEqual(len(socket_map), 1) |
- self.assertEqual(len(self.connections), 1) |
- |
- sent_data = self.fake_socket.GetSentData() |
- |
- # Test subscription request. |
- self.assertEqual(len(sent_data), 0) |
- xmpp_connection.collect_incoming_data( |
- '<iq><subscribe xmlns="google:push"></subscribe></iq>') |
- self.assertEqual(len(sent_data), 1) |
- |
- # Test acks. |
- xmpp_connection.collect_incoming_data('<iq type="result"/>') |
- self.assertEqual(len(sent_data), 1) |
- |
- # Test notification. |
- xmpp_connection.collect_incoming_data( |
- '<message><push xmlns="google:push"/></message>') |
- self.assertEqual(len(sent_data), 2) |
- |
- # Test unexpected stanza. |
- def SendUnexpectedStanza(): |
- xmpp_connection.collect_incoming_data('<foo/>') |
- self.assertRaises(xmppserver.UnexpectedXml, SendUnexpectedStanza) |
- |
- # Test unexpected notifier command. |
- def SendUnexpectedNotifierCommand(): |
- xmpp_connection.collect_incoming_data( |
- '<iq><foo xmlns="google:notifier"/></iq>') |
- self.assertRaises(xmppserver.UnexpectedXml, |
- SendUnexpectedNotifierCommand) |
- |
- # Test close. |
- xmpp_connection.close() |
- self.assertEqual(len(socket_map), 0) |
- self.assertEqual(len(self.connections), 0) |
- |
- def testBasicUnauthenticated(self): |
- socket_map = {} |
- xmpp_connection = xmppserver.XmppConnection( |
- self.fake_socket, socket_map, self, ('', 0), False) |
- self.assertEqual(len(socket_map), 1) |
- self.assertEqual(len(self.connections), 0) |
- xmpp_connection.HandshakeDone(None) |
- self.assertEqual(len(socket_map), 0) |
- self.assertEqual(len(self.connections), 0) |
- |
- # Test unexpected stanza. |
- def SendUnexpectedStanza(): |
- xmpp_connection.collect_incoming_data('<foo/>') |
- self.assertRaises(xmppserver.UnexpectedXml, SendUnexpectedStanza) |
- |
- # Test redundant close. |
- xmpp_connection.close() |
- self.assertEqual(len(socket_map), 0) |
- self.assertEqual(len(self.connections), 0) |
- |
- |
-class FakeXmppServer(xmppserver.XmppServer): |
- """A fake XMPP server object used for testing. |
- """ |
- |
- def __init__(self): |
- self._socket_map = {} |
- self._fake_sockets = set() |
- self._next_jid_suffix = 1 |
- xmppserver.XmppServer.__init__(self, self._socket_map, ('', 0)) |
- |
- def GetSocketMap(self): |
- return self._socket_map |
- |
- def GetFakeSockets(self): |
- return self._fake_sockets |
- |
- def AddHandshakeCompletedConnection(self): |
- """Creates a new XMPP connection and completes its handshake. |
- """ |
- xmpp_connection = self.handle_accept() |
- jid = xmppserver.Jid('user%s' % self._next_jid_suffix, 'domain.com') |
- self._next_jid_suffix += 1 |
- xmpp_connection.HandshakeDone(jid) |
- |
- # XmppServer overrides. |
- def accept(self): |
- fake_socket = FakeSocket() |
- self._fake_sockets.add(fake_socket) |
- return (fake_socket, ('', 0)) |
- |
- def close(self): |
- self._fake_sockets.clear() |
- xmppserver.XmppServer.close(self) |
- |
- |
-class XmppServerTest(unittest.TestCase): |
- |
- def setUp(self): |
- self.xmpp_server = FakeXmppServer() |
- |
- def AssertSentDataLength(self, expected_length): |
- for fake_socket in self.xmpp_server.GetFakeSockets(): |
- self.assertEqual(len(fake_socket.GetSentData()), expected_length) |
- |
- def testBasic(self): |
- socket_map = self.xmpp_server.GetSocketMap() |
- self.assertEqual(len(socket_map), 1) |
- self.xmpp_server.AddHandshakeCompletedConnection() |
- self.assertEqual(len(socket_map), 2) |
- self.xmpp_server.close() |
- self.assertEqual(len(socket_map), 0) |
- |
- def testMakeNotification(self): |
- notification = self.xmpp_server.MakeNotification('channel', 'data') |
- expected_xml = ( |
- '<message>' |
- ' <push channel="channel" xmlns="google:push">' |
- ' <data>%s</data>' |
- ' </push>' |
- '</message>' % base64.b64encode('data')) |
- self.assertEqual(notification.toxml(), expected_xml) |
- |
- def testSendNotification(self): |
- # Add a few connections. |
- for _ in xrange(0, 7): |
- self.xmpp_server.AddHandshakeCompletedConnection() |
- |
- self.assertEqual(len(self.xmpp_server.GetFakeSockets()), 7) |
- |
- self.AssertSentDataLength(0) |
- self.xmpp_server.SendNotification('channel', 'data') |
- self.AssertSentDataLength(1) |
- |
- def testEnableDisableNotifications(self): |
- # Add a few connections. |
- for _ in xrange(0, 5): |
- self.xmpp_server.AddHandshakeCompletedConnection() |
- |
- self.assertEqual(len(self.xmpp_server.GetFakeSockets()), 5) |
- |
- self.AssertSentDataLength(0) |
- self.xmpp_server.SendNotification('channel', 'data') |
- self.AssertSentDataLength(1) |
- |
- self.xmpp_server.EnableNotifications() |
- self.xmpp_server.SendNotification('channel', 'data') |
- self.AssertSentDataLength(2) |
- |
- self.xmpp_server.DisableNotifications() |
- self.xmpp_server.SendNotification('channel', 'data') |
- self.AssertSentDataLength(2) |
- |
- self.xmpp_server.DisableNotifications() |
- self.xmpp_server.SendNotification('channel', 'data') |
- self.AssertSentDataLength(2) |
- |
- self.xmpp_server.EnableNotifications() |
- self.xmpp_server.SendNotification('channel', 'data') |
- self.AssertSentDataLength(3) |
- |
- |
-if __name__ == '__main__': |
- unittest.main() |