Index: third_party/twisted_8_1/twisted/words/test/test_service.py |
diff --git a/third_party/twisted_8_1/twisted/words/test/test_service.py b/third_party/twisted_8_1/twisted/words/test/test_service.py |
deleted file mode 100644 |
index 3313663c5b0e8a1a825578f6b6fc0b4f50f30ba1..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/words/test/test_service.py |
+++ /dev/null |
@@ -1,936 +0,0 @@ |
-# Copyright (c) 2001-2005 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-from __future__ import generators |
- |
-import time |
- |
-from zope.interface import implements |
- |
-from twisted.trial import unittest |
-from twisted.test import proto_helpers |
- |
-from twisted.cred import portal, credentials, checkers |
-from twisted.words import ewords, iwords, service |
-from twisted.words.protocols import irc |
-from twisted.spread import pb |
-from twisted.internet.defer import Deferred, DeferredList, maybeDeferred, succeed |
-from twisted.internet.defer import deferredGenerator as dG, waitForDeferred as wFD |
-from twisted.internet import address, reactor |
- |
-class RealmTestCase(unittest.TestCase): |
- def _entityCreationTest(self, kind): |
- # Kind is "user" or "group" |
- realm = service.InMemoryWordsRealm("realmname") |
- |
- name = u'test' + kind.lower() |
- create = getattr(realm, 'create' + kind.title()) |
- get = getattr(realm, 'get' + kind.title()) |
- flag = 'create' + kind.title() + 'OnRequest' |
- dupExc = getattr(ewords, 'Duplicate' + kind.title()) |
- noSuchExc = getattr(ewords, 'NoSuch' + kind.title()) |
- |
- # Creating should succeed |
- d = wFD(create(name)) |
- yield d |
- p = d.getResult() |
- self.assertEquals(p.name, name) |
- |
- # Creating the same user again should not |
- d = wFD(create(name)) |
- yield d |
- self.assertRaises(dupExc, d.getResult) |
- |
- # Getting a non-existent user should succeed if createUserOnRequest is True |
- setattr(realm, flag, True) |
- d = wFD(get(u"new" + kind.lower())) |
- yield d |
- p = d.getResult() |
- self.assertEquals(p.name, "new" + kind.lower()) |
- |
- # Getting that user again should return the same object |
- d = wFD(get(u"new" + kind.lower())) |
- yield d |
- newp = d.getResult() |
- self.assertIdentical(p, newp) |
- |
- # Getting a non-existent user should fail if createUserOnRequest is False |
- setattr(realm, flag, False) |
- d = wFD(get(u"another" + kind.lower())) |
- yield d |
- self.assertRaises(noSuchExc, d.getResult) |
- _entityCreationTest = dG(_entityCreationTest) |
- |
- |
- def testUserCreation(self): |
- return self._entityCreationTest("User") |
- |
- |
- def testGroupCreation(self): |
- return self._entityCreationTest("Group") |
- |
- |
- def testUserRetrieval(self): |
- realm = service.InMemoryWordsRealm("realmname") |
- |
- # Make a user to play around with |
- d = wFD(realm.createUser(u"testuser")) |
- yield d |
- user = d.getResult() |
- |
- # Make sure getting the user returns the same object |
- d = wFD(realm.getUser(u"testuser")) |
- yield d |
- retrieved = d.getResult() |
- self.assertIdentical(user, retrieved) |
- |
- # Make sure looking up the user also returns the same object |
- d = wFD(realm.lookupUser(u"testuser")) |
- yield d |
- lookedUp = d.getResult() |
- self.assertIdentical(retrieved, lookedUp) |
- |
- # Make sure looking up a user who does not exist fails |
- d = wFD(realm.lookupUser(u"nosuchuser")) |
- yield d |
- self.assertRaises(ewords.NoSuchUser, d.getResult) |
- testUserRetrieval = dG(testUserRetrieval) |
- |
- |
- def testUserAddition(self): |
- realm = service.InMemoryWordsRealm("realmname") |
- |
- # Create and manually add a user to the realm |
- p = service.User("testuser") |
- d = wFD(realm.addUser(p)) |
- yield d |
- user = d.getResult() |
- self.assertIdentical(p, user) |
- |
- # Make sure getting that user returns the same object |
- d = wFD(realm.getUser(u"testuser")) |
- yield d |
- retrieved = d.getResult() |
- self.assertIdentical(user, retrieved) |
- |
- # Make sure looking up that user returns the same object |
- d = wFD(realm.lookupUser(u"testuser")) |
- yield d |
- lookedUp = d.getResult() |
- self.assertIdentical(retrieved, lookedUp) |
- testUserAddition = dG(testUserAddition) |
- |
- |
- def testGroupRetrieval(self): |
- realm = service.InMemoryWordsRealm("realmname") |
- |
- d = wFD(realm.createGroup(u"testgroup")) |
- yield d |
- group = d.getResult() |
- |
- d = wFD(realm.getGroup(u"testgroup")) |
- yield d |
- retrieved = d.getResult() |
- |
- self.assertIdentical(group, retrieved) |
- |
- d = wFD(realm.getGroup(u"nosuchgroup")) |
- yield d |
- self.assertRaises(ewords.NoSuchGroup, d.getResult) |
- testGroupRetrieval = dG(testGroupRetrieval) |
- |
- |
- def testGroupAddition(self): |
- realm = service.InMemoryWordsRealm("realmname") |
- |
- p = service.Group("testgroup") |
- d = wFD(realm.addGroup(p)) |
- yield d |
- d.getResult() |
- |
- d = wFD(realm.getGroup(u"testGroup")) |
- yield d |
- group = d.getResult() |
- |
- self.assertIdentical(p, group) |
- testGroupAddition = dG(testGroupAddition) |
- |
- |
- def testGroupUsernameCollision(self): |
- """ |
- Try creating a group with the same name as an existing user and |
- assert that it succeeds, since users and groups should not be in the |
- same namespace and collisions should be impossible. |
- """ |
- realm = service.InMemoryWordsRealm("realmname") |
- |
- d = wFD(realm.createUser(u"test")) |
- yield d |
- user = d.getResult() |
- |
- d = wFD(realm.createGroup(u"test")) |
- yield d |
- group = d.getResult() |
- testGroupUsernameCollision = dG(testGroupUsernameCollision) |
- |
- |
- def testEnumeration(self): |
- realm = service.InMemoryWordsRealm("realmname") |
- d = wFD(realm.createGroup(u"groupone")) |
- yield d |
- d.getResult() |
- |
- d = wFD(realm.createGroup(u"grouptwo")) |
- yield d |
- d.getResult() |
- |
- groups = wFD(realm.itergroups()) |
- yield groups |
- groups = groups.getResult() |
- |
- n = [g.name for g in groups] |
- n.sort() |
- self.assertEquals(n, ["groupone", "grouptwo"]) |
- testEnumeration = dG(testEnumeration) |
- |
- |
-class TestGroup(object): |
- def __init__(self, name, size, topic): |
- self.name = name |
- self.size = lambda: size |
- self.meta = {'topic': topic} |
- |
- |
-class TestUser(object): |
- def __init__(self, name, groups, signOn, lastMessage): |
- self.name = name |
- self.itergroups = lambda: iter([TestGroup(g, 3, 'Hello') for g in groups]) |
- self.signOn = signOn |
- self.lastMessage = lastMessage |
- |
- |
-class TestPortal(object): |
- def __init__(self): |
- self.logins = [] |
- |
- def login(self, credentials, mind, *interfaces): |
- d = Deferred() |
- self.logins.append((credentials, mind, interfaces, d)) |
- return d |
- |
- |
-class TestCaseUserAgg(object): |
- def __init__(self, user, realm, factory, address=address.IPv4Address('TCP', '127.0.0.1', 54321)): |
- self.user = user |
- self.transport = proto_helpers.StringTransportWithDisconnection() |
- self.protocol = factory.buildProtocol(address) |
- self.transport.protocol = self.protocol |
- self.user.mind = self.protocol |
- self.protocol.makeConnection(self.transport) |
- |
- def write(self, stuff): |
- if isinstance(stuff, unicode): |
- stuff = stuff.encode('utf-8') |
- self.protocol.dataReceived(stuff) |
- |
- |
-class IRCProtocolTestCase(unittest.TestCase): |
- STATIC_USERS = [ |
- u'useruser', u'otheruser', u'someguy', u'firstuser', u'username', |
- u'userone', u'usertwo', u'userthree', u'someuser'] |
- |
- def setUp(self): |
- self.realm = service.InMemoryWordsRealm("realmname") |
- self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse() |
- self.portal = portal.Portal(self.realm, [self.checker]) |
- self.factory = service.IRCFactory(self.realm, self.portal) |
- |
- c = [] |
- for nick in self.STATIC_USERS: |
- c.append(self.realm.createUser(nick)) |
- self.checker.addUser(nick.encode('ascii'), nick + "_password") |
- return DeferredList(c) |
- |
- |
- def _assertGreeting(self, user): |
- # Make sure we get 1-4 at least |
- response = self._response(user) |
- expected = range(1, 5) |
- for (prefix, command, args) in response: |
- try: |
- expected.remove(int(command)) |
- except KeyError: |
- pass |
- self.failIf(expected, "Missing responses for %r" % (expected,)) |
- |
- |
- def _login(self, user, nick, password=None): |
- if password is None: |
- password = nick + "_password" |
- user.write('PASS %s\r\n' % (password,)) |
- user.write('NICK %s extrainfo\r\n' % (nick,)) |
- |
- |
- def _loggedInUser(self, name): |
- d = wFD(self.realm.lookupUser(name)) |
- yield d |
- user = d.getResult() |
- agg = TestCaseUserAgg(user, self.realm, self.factory) |
- self._login(agg, name) |
- yield agg |
- _loggedInUser = dG(_loggedInUser) |
- |
- |
- def _response(self, user): |
- response = user.transport.value().splitlines() |
- user.transport.clear() |
- return map(irc.parsemsg, response) |
- |
- |
- def testPASSLogin(self): |
- user = wFD(self._loggedInUser(u'firstuser')) |
- yield user |
- user = user.getResult() |
- self._assertGreeting(user) |
- testPASSLogin = dG(testPASSLogin) |
- |
- |
- def testNickServLogin(self): |
- firstuser = wFD(self.realm.lookupUser(u'firstuser')) |
- yield firstuser |
- firstuser = firstuser.getResult() |
- |
- user = TestCaseUserAgg(firstuser, self.realm, self.factory) |
- user.write('NICK firstuser extrainfo\r\n') |
- response = self._response(user) |
- self.assertEquals(len(response), 1) |
- self.assertEquals(response[0][0], service.NICKSERV) |
- self.assertEquals(response[0][1], 'PRIVMSG') |
- self.assertEquals(response[0][2], ['firstuser', 'Password?']) |
- user.transport.clear() |
- |
- user.write('PRIVMSG nickserv firstuser_password\r\n') |
- self._assertGreeting(user) |
- testNickServLogin = dG(testNickServLogin) |
- |
- |
- def testFailedLogin(self): |
- firstuser = wFD(self.realm.lookupUser(u'firstuser')) |
- yield firstuser |
- firstuser = firstuser.getResult() |
- |
- user = TestCaseUserAgg(firstuser, self.realm, self.factory) |
- self._login(user, "firstuser", "wrongpass") |
- response = self._response(user) |
- self.assertEquals(len(response), 1) |
- self.assertEquals(response[0][2], ['firstuser', 'Login failed. Goodbye.']) |
- testFailedLogin = dG(testFailedLogin) |
- |
- |
- def testLogout(self): |
- logout = [] |
- firstuser = wFD(self.realm.lookupUser(u'firstuser')) |
- yield firstuser |
- firstuser = firstuser.getResult() |
- |
- user = TestCaseUserAgg(firstuser, self.realm, self.factory) |
- self._login(user, "firstuser") |
- user.protocol.logout = lambda: logout.append(True) |
- user.write('QUIT\r\n') |
- self.assertEquals(logout, [True]) |
- testLogout = dG(testLogout) |
- |
- |
- def testJoin(self): |
- firstuser = wFD(self.realm.lookupUser(u'firstuser')) |
- yield firstuser |
- firstuser = firstuser.getResult() |
- |
- somechannel = wFD(self.realm.createGroup(u"somechannel")) |
- yield somechannel |
- somechannel = somechannel.getResult() |
- |
- # Bring in one user, make sure he gets into the channel sanely |
- user = TestCaseUserAgg(firstuser, self.realm, self.factory) |
- self._login(user, "firstuser") |
- user.transport.clear() |
- user.write('JOIN #somechannel\r\n') |
- |
- response = self._response(user) |
- self.assertEquals(len(response), 5) |
- |
- # Join message |
- self.assertEquals(response[0][0], 'firstuser!firstuser@realmname') |
- self.assertEquals(response[0][1], 'JOIN') |
- self.assertEquals(response[0][2], ['#somechannel']) |
- |
- # User list |
- self.assertEquals(response[1][1], '353') |
- self.assertEquals(response[2][1], '366') |
- |
- # Topic (or lack thereof, as the case may be) |
- self.assertEquals(response[3][1], '332') |
- self.assertEquals(response[4][1], '333') |
- |
- |
- # Hook up another client! It is a CHAT SYSTEM!!!!!!! |
- other = wFD(self._loggedInUser(u'otheruser')) |
- yield other |
- other = other.getResult() |
- |
- other.transport.clear() |
- user.transport.clear() |
- other.write('JOIN #somechannel\r\n') |
- |
- # At this point, both users should be in the channel |
- response = self._response(other) |
- |
- event = self._response(user) |
- self.assertEquals(len(event), 1) |
- self.assertEquals(event[0][0], 'otheruser!otheruser@realmname') |
- self.assertEquals(event[0][1], 'JOIN') |
- self.assertEquals(event[0][2], ['#somechannel']) |
- |
- self.assertEquals(response[1][0], 'realmname') |
- self.assertEquals(response[1][1], '353') |
- self.assertEquals(response[1][2], ['otheruser', '=', '#somechannel', 'firstuser otheruser']) |
- testJoin = dG(testJoin) |
- |
- |
- def testLeave(self): |
- user = wFD(self._loggedInUser(u'useruser')) |
- yield user |
- user = user.getResult() |
- |
- somechannel = wFD(self.realm.createGroup(u"somechannel")) |
- yield somechannel |
- somechannel = somechannel.getResult() |
- |
- user.write('JOIN #somechannel\r\n') |
- user.transport.clear() |
- |
- other = wFD(self._loggedInUser(u'otheruser')) |
- yield other |
- other = other.getResult() |
- |
- other.write('JOIN #somechannel\r\n') |
- |
- user.transport.clear() |
- other.transport.clear() |
- |
- user.write('PART #somechannel\r\n') |
- |
- response = self._response(user) |
- event = self._response(other) |
- |
- self.assertEquals(len(response), 1) |
- self.assertEquals(response[0][0], 'useruser!useruser@realmname') |
- self.assertEquals(response[0][1], 'PART') |
- self.assertEquals(response[0][2], ['#somechannel', 'leaving']) |
- self.assertEquals(response, event) |
- |
- # Now again, with a part message |
- user.write('JOIN #somechannel\r\n') |
- |
- user.transport.clear() |
- other.transport.clear() |
- |
- user.write('PART #somechannel :goodbye stupidheads\r\n') |
- |
- response = self._response(user) |
- event = self._response(other) |
- |
- self.assertEquals(len(response), 1) |
- self.assertEquals(response[0][0], 'useruser!useruser@realmname') |
- self.assertEquals(response[0][1], 'PART') |
- self.assertEquals(response[0][2], ['#somechannel', 'goodbye stupidheads']) |
- self.assertEquals(response, event) |
- testLeave = dG(testLeave) |
- |
- |
- def testGetTopic(self): |
- user = wFD(self._loggedInUser(u'useruser')) |
- yield user |
- user = user.getResult() |
- |
- group = service.Group("somechannel") |
- group.meta["topic"] = "This is a test topic." |
- group.meta["topic_author"] = "some_fellow" |
- group.meta["topic_date"] = 77777777 |
- |
- add = wFD(self.realm.addGroup(group)) |
- yield add |
- add.getResult() |
- |
- user.transport.clear() |
- user.write("JOIN #somechannel\r\n") |
- |
- response = self._response(user) |
- |
- self.assertEquals(response[3][0], 'realmname') |
- self.assertEquals(response[3][1], '332') |
- |
- # XXX Sigh. irc.parsemsg() is not as correct as one might hope. |
- self.assertEquals(response[3][2], ['useruser', '#somechannel', 'This is a test topic.']) |
- self.assertEquals(response[4][1], '333') |
- self.assertEquals(response[4][2], ['useruser', '#somechannel', 'some_fellow', '77777777']) |
- |
- user.transport.clear() |
- |
- user.write('TOPIC #somechannel\r\n') |
- |
- response = self._response(user) |
- |
- self.assertEquals(response[0][1], '332') |
- self.assertEquals(response[0][2], ['useruser', '#somechannel', 'This is a test topic.']) |
- self.assertEquals(response[1][1], '333') |
- self.assertEquals(response[1][2], ['useruser', '#somechannel', 'some_fellow', '77777777']) |
- testGetTopic = dG(testGetTopic) |
- |
- |
- def testSetTopic(self): |
- user = wFD(self._loggedInUser(u'useruser')) |
- yield user |
- user = user.getResult() |
- |
- add = wFD(self.realm.createGroup(u"somechannel")) |
- yield add |
- somechannel = add.getResult() |
- |
- user.write("JOIN #somechannel\r\n") |
- |
- other = wFD(self._loggedInUser(u'otheruser')) |
- yield other |
- other = other.getResult() |
- |
- other.write("JOIN #somechannel\r\n") |
- |
- user.transport.clear() |
- other.transport.clear() |
- |
- other.write('TOPIC #somechannel :This is the new topic.\r\n') |
- |
- response = self._response(other) |
- event = self._response(user) |
- |
- self.assertEquals(response, event) |
- |
- self.assertEquals(response[0][0], 'otheruser!otheruser@realmname') |
- self.assertEquals(response[0][1], 'TOPIC') |
- self.assertEquals(response[0][2], ['#somechannel', 'This is the new topic.']) |
- |
- other.transport.clear() |
- |
- somechannel.meta['topic_date'] = 12345 |
- other.write('TOPIC #somechannel\r\n') |
- |
- response = self._response(other) |
- self.assertEquals(response[0][1], '332') |
- self.assertEquals(response[0][2], ['otheruser', '#somechannel', 'This is the new topic.']) |
- self.assertEquals(response[1][1], '333') |
- self.assertEquals(response[1][2], ['otheruser', '#somechannel', 'otheruser', '12345']) |
- |
- other.transport.clear() |
- other.write('TOPIC #asdlkjasd\r\n') |
- |
- response = self._response(other) |
- self.assertEquals(response[0][1], '403') |
- testSetTopic = dG(testSetTopic) |
- |
- |
- def testGroupMessage(self): |
- user = wFD(self._loggedInUser(u'useruser')) |
- yield user |
- user = user.getResult() |
- |
- add = wFD(self.realm.createGroup(u"somechannel")) |
- yield add |
- somechannel = add.getResult() |
- |
- user.write("JOIN #somechannel\r\n") |
- |
- other = wFD(self._loggedInUser(u'otheruser')) |
- yield other |
- other = other.getResult() |
- |
- other.write("JOIN #somechannel\r\n") |
- |
- user.transport.clear() |
- other.transport.clear() |
- |
- user.write('PRIVMSG #somechannel :Hello, world.\r\n') |
- |
- response = self._response(user) |
- event = self._response(other) |
- |
- self.failIf(response) |
- self.assertEquals(len(event), 1) |
- self.assertEquals(event[0][0], 'useruser!useruser@realmname') |
- self.assertEquals(event[0][1], 'PRIVMSG', -1) |
- self.assertEquals(event[0][2], ['#somechannel', 'Hello, world.']) |
- testGroupMessage = dG(testGroupMessage) |
- |
- |
- def testPrivateMessage(self): |
- user = wFD(self._loggedInUser(u'useruser')) |
- yield user |
- user = user.getResult() |
- |
- other = wFD(self._loggedInUser(u'otheruser')) |
- yield other |
- other = other.getResult() |
- |
- user.transport.clear() |
- other.transport.clear() |
- |
- user.write('PRIVMSG otheruser :Hello, monkey.\r\n') |
- |
- response = self._response(user) |
- event = self._response(other) |
- |
- self.failIf(response) |
- self.assertEquals(len(event), 1) |
- self.assertEquals(event[0][0], 'useruser!useruser@realmname') |
- self.assertEquals(event[0][1], 'PRIVMSG') |
- self.assertEquals(event[0][2], ['otheruser', 'Hello, monkey.']) |
- |
- user.write('PRIVMSG nousernamedthis :Hello, monkey.\r\n') |
- |
- response = self._response(user) |
- |
- self.assertEquals(len(response), 1) |
- self.assertEquals(response[0][0], 'realmname') |
- self.assertEquals(response[0][1], '401') |
- self.assertEquals(response[0][2], ['useruser', 'nousernamedthis', 'No such nick/channel.']) |
- testPrivateMessage = dG(testPrivateMessage) |
- |
- |
- def testOper(self): |
- user = wFD(self._loggedInUser(u'useruser')) |
- yield user |
- user = user.getResult() |
- |
- user.transport.clear() |
- user.write('OPER user pass\r\n') |
- response = self._response(user) |
- |
- self.assertEquals(len(response), 1) |
- self.assertEquals(response[0][1], '491') |
- testOper = dG(testOper) |
- |
- |
- def testGetUserMode(self): |
- user = wFD(self._loggedInUser(u'useruser')) |
- yield user |
- user = user.getResult() |
- |
- user.transport.clear() |
- user.write('MODE useruser\r\n') |
- |
- response = self._response(user) |
- self.assertEquals(len(response), 1) |
- self.assertEquals(response[0][0], 'realmname') |
- self.assertEquals(response[0][1], '221') |
- self.assertEquals(response[0][2], ['useruser', '+']) |
- testGetUserMode = dG(testGetUserMode) |
- |
- |
- def testSetUserMode(self): |
- user = wFD(self._loggedInUser(u'useruser')) |
- yield user |
- user = user.getResult() |
- |
- user.transport.clear() |
- user.write('MODE useruser +abcd\r\n') |
- |
- response = self._response(user) |
- self.assertEquals(len(response), 1) |
- self.assertEquals(response[0][1], '472') |
- testSetUserMode = dG(testSetUserMode) |
- |
- |
- def testGetGroupMode(self): |
- user = wFD(self._loggedInUser(u'useruser')) |
- yield user |
- user = user.getResult() |
- |
- add = wFD(self.realm.createGroup(u"somechannel")) |
- yield add |
- somechannel = add.getResult() |
- |
- user.write('JOIN #somechannel\r\n') |
- |
- user.transport.clear() |
- user.write('MODE #somechannel\r\n') |
- |
- response = self._response(user) |
- self.assertEquals(len(response), 1) |
- self.assertEquals(response[0][1], '324') |
- testGetGroupMode = dG(testGetGroupMode) |
- |
- |
- def testSetGroupMode(self): |
- user = wFD(self._loggedInUser(u'useruser')) |
- yield user |
- user = user.getResult() |
- |
- group = wFD(self.realm.createGroup(u"groupname")) |
- yield group |
- group = group.getResult() |
- |
- user.write('JOIN #groupname\r\n') |
- |
- user.transport.clear() |
- user.write('MODE #groupname +abcd\r\n') |
- |
- response = self._response(user) |
- self.assertEquals(len(response), 1) |
- self.assertEquals(response[0][1], '472') |
- testSetGroupMode = dG(testSetGroupMode) |
- |
- |
- def testWho(self): |
- group = service.Group('groupname') |
- add = wFD(self.realm.addGroup(group)) |
- yield add |
- add.getResult() |
- |
- users = [] |
- for nick in u'userone', u'usertwo', u'userthree': |
- u = wFD(self._loggedInUser(nick)) |
- yield u |
- u = u.getResult() |
- users.append(u) |
- users[-1].write('JOIN #groupname\r\n') |
- for user in users: |
- user.transport.clear() |
- |
- users[0].write('WHO #groupname\r\n') |
- |
- r = self._response(users[0]) |
- self.failIf(self._response(users[1])) |
- self.failIf(self._response(users[2])) |
- |
- wantusers = ['userone', 'usertwo', 'userthree'] |
- for (prefix, code, stuff) in r[:-1]: |
- self.assertEquals(prefix, 'realmname') |
- self.assertEquals(code, '352') |
- |
- (myname, group, theirname, theirhost, theirserver, theirnick, flag, extra) = stuff |
- self.assertEquals(myname, 'userone') |
- self.assertEquals(group, '#groupname') |
- self.failUnless(theirname in wantusers) |
- self.assertEquals(theirhost, 'realmname') |
- self.assertEquals(theirserver, 'realmname') |
- wantusers.remove(theirnick) |
- self.assertEquals(flag, 'H') |
- self.assertEquals(extra, '0 ' + theirnick) |
- self.failIf(wantusers) |
- |
- prefix, code, stuff = r[-1] |
- self.assertEquals(prefix, 'realmname') |
- self.assertEquals(code, '315') |
- myname, channel, extra = stuff |
- self.assertEquals(myname, 'userone') |
- self.assertEquals(channel, '#groupname') |
- self.assertEquals(extra, 'End of /WHO list.') |
- testWho = dG(testWho) |
- |
- |
- def testList(self): |
- user = wFD(self._loggedInUser(u"someuser")) |
- yield user |
- user = user.getResult() |
- user.transport.clear() |
- |
- somegroup = wFD(self.realm.createGroup(u"somegroup")) |
- yield somegroup |
- somegroup = somegroup.getResult() |
- somegroup.size = lambda: succeed(17) |
- somegroup.meta['topic'] = 'this is the topic woo' |
- |
- # Test one group |
- user.write('LIST #somegroup\r\n') |
- |
- r = self._response(user) |
- self.assertEquals(len(r), 2) |
- resp, end = r |
- |
- self.assertEquals(resp[0], 'realmname') |
- self.assertEquals(resp[1], '322') |
- self.assertEquals(resp[2][0], 'someuser') |
- self.assertEquals(resp[2][1], 'somegroup') |
- self.assertEquals(resp[2][2], '17') |
- self.assertEquals(resp[2][3], 'this is the topic woo') |
- |
- self.assertEquals(end[0], 'realmname') |
- self.assertEquals(end[1], '323') |
- self.assertEquals(end[2][0], 'someuser') |
- self.assertEquals(end[2][1], 'End of /LIST') |
- |
- user.transport.clear() |
- # Test all groups |
- |
- user.write('LIST\r\n') |
- r = self._response(user) |
- self.assertEquals(len(r), 2) |
- |
- fg1, end = r |
- |
- self.assertEquals(fg1[1], '322') |
- self.assertEquals(fg1[2][1], 'somegroup') |
- self.assertEquals(fg1[2][2], '17') |
- self.assertEquals(fg1[2][3], 'this is the topic woo') |
- |
- self.assertEquals(end[1], '323') |
- testList = dG(testList) |
- |
- |
- def testWhois(self): |
- user = wFD(self._loggedInUser(u'someguy')) |
- yield user |
- user = user.getResult() |
- |
- otherguy = service.User("otherguy") |
- otherguy.itergroups = lambda: iter([ |
- service.Group('groupA'), |
- service.Group('groupB')]) |
- otherguy.signOn = 10 |
- otherguy.lastMessage = time.time() - 15 |
- |
- add = wFD(self.realm.addUser(otherguy)) |
- yield add |
- add.getResult() |
- |
- user.transport.clear() |
- user.write('WHOIS otherguy\r\n') |
- r = self._response(user) |
- |
- self.assertEquals(len(r), 5) |
- wuser, wserver, idle, channels, end = r |
- |
- self.assertEquals(wuser[0], 'realmname') |
- self.assertEquals(wuser[1], '311') |
- self.assertEquals(wuser[2][0], 'someguy') |
- self.assertEquals(wuser[2][1], 'otherguy') |
- self.assertEquals(wuser[2][2], 'otherguy') |
- self.assertEquals(wuser[2][3], 'realmname') |
- self.assertEquals(wuser[2][4], '*') |
- self.assertEquals(wuser[2][5], 'otherguy') |
- |
- self.assertEquals(wserver[0], 'realmname') |
- self.assertEquals(wserver[1], '312') |
- self.assertEquals(wserver[2][0], 'someguy') |
- self.assertEquals(wserver[2][1], 'otherguy') |
- self.assertEquals(wserver[2][2], 'realmname') |
- self.assertEquals(wserver[2][3], 'Hi mom!') |
- |
- self.assertEquals(idle[0], 'realmname') |
- self.assertEquals(idle[1], '317') |
- self.assertEquals(idle[2][0], 'someguy') |
- self.assertEquals(idle[2][1], 'otherguy') |
- self.assertEquals(idle[2][2], '15') |
- self.assertEquals(idle[2][3], '10') |
- self.assertEquals(idle[2][4], "seconds idle, signon time") |
- |
- self.assertEquals(channels[0], 'realmname') |
- self.assertEquals(channels[1], '319') |
- self.assertEquals(channels[2][0], 'someguy') |
- self.assertEquals(channels[2][1], 'otherguy') |
- self.assertEquals(channels[2][2], '#groupA #groupB') |
- |
- self.assertEquals(end[0], 'realmname') |
- self.assertEquals(end[1], '318') |
- self.assertEquals(end[2][0], 'someguy') |
- self.assertEquals(end[2][1], 'otherguy') |
- self.assertEquals(end[2][2], 'End of WHOIS list.') |
- testWhois = dG(testWhois) |
- |
- |
-class TestMind(service.PBMind): |
- def __init__(self, *a, **kw): |
- self.joins = [] |
- self.parts = [] |
- self.messages = [] |
- self.meta = [] |
- |
- def remote_userJoined(self, user, group): |
- self.joins.append((user, group)) |
- |
- def remote_userLeft(self, user, group, reason): |
- self.parts.append((user, group, reason)) |
- |
- def remote_receive(self, sender, recipient, message): |
- self.messages.append((sender, recipient, message)) |
- |
- def remote_groupMetaUpdate(self, group, meta): |
- self.meta.append((group, meta)) |
-pb.setUnjellyableForClass(TestMind, service.PBMindReference) |
- |
- |
-class PBProtocolTestCase(unittest.TestCase): |
- def setUp(self): |
- self.realm = service.InMemoryWordsRealm("realmname") |
- self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse() |
- self.portal = portal.Portal( |
- self.realm, [self.checker]) |
- self.serverFactory = pb.PBServerFactory(self.portal) |
- self.serverFactory.protocol = self._protocolFactory |
- self.serverFactory.unsafeTracebacks = True |
- self.clientFactory = pb.PBClientFactory() |
- self.clientFactory.unsafeTracebacks = True |
- self.serverPort = reactor.listenTCP(0, self.serverFactory) |
- self.clientConn = reactor.connectTCP( |
- '127.0.0.1', |
- self.serverPort.getHost().port, |
- self.clientFactory) |
- |
- def _protocolFactory(self, *args, **kw): |
- self._serverProtocol = pb.Broker(0) |
- return self._serverProtocol |
- |
- def tearDown(self): |
- d3 = Deferred() |
- self._serverProtocol.notifyOnDisconnect(lambda: d3.callback(None)) |
- return DeferredList([ |
- maybeDeferred(self.serverPort.stopListening), |
- maybeDeferred(self.clientConn.disconnect), d3]) |
- |
- def _loggedInAvatar(self, name, password, mind): |
- creds = credentials.UsernamePassword(name, password) |
- self.checker.addUser(name.encode('ascii'), password) |
- d = self.realm.createUser(name) |
- d.addCallback(lambda ign: self.clientFactory.login(creds, mind)) |
- return d |
- |
- def testGroups(self): |
- mindone = TestMind() |
- one = wFD(self._loggedInAvatar(u"one", "p1", mindone)) |
- yield one |
- one = one.getResult() |
- |
- mindtwo = TestMind() |
- two = wFD(self._loggedInAvatar(u"two", "p2", mindtwo)) |
- yield two |
- two = two.getResult() |
- |
- add = wFD(self.realm.createGroup(u"foobar")) |
- yield add |
- add.getResult() |
- |
- groupone = wFD(one.join(u"foobar")) |
- yield groupone |
- groupone = groupone.getResult() |
- |
- grouptwo = wFD(two.join(u"foobar")) |
- yield grouptwo |
- grouptwo = grouptwo.getResult() |
- |
- msg = wFD(groupone.send({"text": "hello, monkeys"})) |
- yield msg |
- msg = msg.getResult() |
- |
- leave = wFD(groupone.leave()) |
- yield leave |
- leave = leave.getResult() |
- testGroups = dG(testGroups) |