| Index: third_party/twisted_8_1/twisted/words/test/test_service.py
|
| diff --git a/third_party/twisted_8_1/twisted/words/test/test_service.py b/third_party/twisted_8_1/twisted/words/test/test_service.py
|
| deleted file mode 100644
|
| index 3313663c5b0e8a1a825578f6b6fc0b4f50f30ba1..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/words/test/test_service.py
|
| +++ /dev/null
|
| @@ -1,936 +0,0 @@
|
| -# Copyright (c) 2001-2005 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -from __future__ import generators
|
| -
|
| -import time
|
| -
|
| -from zope.interface import implements
|
| -
|
| -from twisted.trial import unittest
|
| -from twisted.test import proto_helpers
|
| -
|
| -from twisted.cred import portal, credentials, checkers
|
| -from twisted.words import ewords, iwords, service
|
| -from twisted.words.protocols import irc
|
| -from twisted.spread import pb
|
| -from twisted.internet.defer import Deferred, DeferredList, maybeDeferred, succeed
|
| -from twisted.internet.defer import deferredGenerator as dG, waitForDeferred as wFD
|
| -from twisted.internet import address, reactor
|
| -
|
| -class RealmTestCase(unittest.TestCase):
|
| - def _entityCreationTest(self, kind):
|
| - # Kind is "user" or "group"
|
| - realm = service.InMemoryWordsRealm("realmname")
|
| -
|
| - name = u'test' + kind.lower()
|
| - create = getattr(realm, 'create' + kind.title())
|
| - get = getattr(realm, 'get' + kind.title())
|
| - flag = 'create' + kind.title() + 'OnRequest'
|
| - dupExc = getattr(ewords, 'Duplicate' + kind.title())
|
| - noSuchExc = getattr(ewords, 'NoSuch' + kind.title())
|
| -
|
| - # Creating should succeed
|
| - d = wFD(create(name))
|
| - yield d
|
| - p = d.getResult()
|
| - self.assertEquals(p.name, name)
|
| -
|
| - # Creating the same user again should not
|
| - d = wFD(create(name))
|
| - yield d
|
| - self.assertRaises(dupExc, d.getResult)
|
| -
|
| - # Getting a non-existent user should succeed if createUserOnRequest is True
|
| - setattr(realm, flag, True)
|
| - d = wFD(get(u"new" + kind.lower()))
|
| - yield d
|
| - p = d.getResult()
|
| - self.assertEquals(p.name, "new" + kind.lower())
|
| -
|
| - # Getting that user again should return the same object
|
| - d = wFD(get(u"new" + kind.lower()))
|
| - yield d
|
| - newp = d.getResult()
|
| - self.assertIdentical(p, newp)
|
| -
|
| - # Getting a non-existent user should fail if createUserOnRequest is False
|
| - setattr(realm, flag, False)
|
| - d = wFD(get(u"another" + kind.lower()))
|
| - yield d
|
| - self.assertRaises(noSuchExc, d.getResult)
|
| - _entityCreationTest = dG(_entityCreationTest)
|
| -
|
| -
|
| - def testUserCreation(self):
|
| - return self._entityCreationTest("User")
|
| -
|
| -
|
| - def testGroupCreation(self):
|
| - return self._entityCreationTest("Group")
|
| -
|
| -
|
| - def testUserRetrieval(self):
|
| - realm = service.InMemoryWordsRealm("realmname")
|
| -
|
| - # Make a user to play around with
|
| - d = wFD(realm.createUser(u"testuser"))
|
| - yield d
|
| - user = d.getResult()
|
| -
|
| - # Make sure getting the user returns the same object
|
| - d = wFD(realm.getUser(u"testuser"))
|
| - yield d
|
| - retrieved = d.getResult()
|
| - self.assertIdentical(user, retrieved)
|
| -
|
| - # Make sure looking up the user also returns the same object
|
| - d = wFD(realm.lookupUser(u"testuser"))
|
| - yield d
|
| - lookedUp = d.getResult()
|
| - self.assertIdentical(retrieved, lookedUp)
|
| -
|
| - # Make sure looking up a user who does not exist fails
|
| - d = wFD(realm.lookupUser(u"nosuchuser"))
|
| - yield d
|
| - self.assertRaises(ewords.NoSuchUser, d.getResult)
|
| - testUserRetrieval = dG(testUserRetrieval)
|
| -
|
| -
|
| - def testUserAddition(self):
|
| - realm = service.InMemoryWordsRealm("realmname")
|
| -
|
| - # Create and manually add a user to the realm
|
| - p = service.User("testuser")
|
| - d = wFD(realm.addUser(p))
|
| - yield d
|
| - user = d.getResult()
|
| - self.assertIdentical(p, user)
|
| -
|
| - # Make sure getting that user returns the same object
|
| - d = wFD(realm.getUser(u"testuser"))
|
| - yield d
|
| - retrieved = d.getResult()
|
| - self.assertIdentical(user, retrieved)
|
| -
|
| - # Make sure looking up that user returns the same object
|
| - d = wFD(realm.lookupUser(u"testuser"))
|
| - yield d
|
| - lookedUp = d.getResult()
|
| - self.assertIdentical(retrieved, lookedUp)
|
| - testUserAddition = dG(testUserAddition)
|
| -
|
| -
|
| - def testGroupRetrieval(self):
|
| - realm = service.InMemoryWordsRealm("realmname")
|
| -
|
| - d = wFD(realm.createGroup(u"testgroup"))
|
| - yield d
|
| - group = d.getResult()
|
| -
|
| - d = wFD(realm.getGroup(u"testgroup"))
|
| - yield d
|
| - retrieved = d.getResult()
|
| -
|
| - self.assertIdentical(group, retrieved)
|
| -
|
| - d = wFD(realm.getGroup(u"nosuchgroup"))
|
| - yield d
|
| - self.assertRaises(ewords.NoSuchGroup, d.getResult)
|
| - testGroupRetrieval = dG(testGroupRetrieval)
|
| -
|
| -
|
| - def testGroupAddition(self):
|
| - realm = service.InMemoryWordsRealm("realmname")
|
| -
|
| - p = service.Group("testgroup")
|
| - d = wFD(realm.addGroup(p))
|
| - yield d
|
| - d.getResult()
|
| -
|
| - d = wFD(realm.getGroup(u"testGroup"))
|
| - yield d
|
| - group = d.getResult()
|
| -
|
| - self.assertIdentical(p, group)
|
| - testGroupAddition = dG(testGroupAddition)
|
| -
|
| -
|
| - def testGroupUsernameCollision(self):
|
| - """
|
| - Try creating a group with the same name as an existing user and
|
| - assert that it succeeds, since users and groups should not be in the
|
| - same namespace and collisions should be impossible.
|
| - """
|
| - realm = service.InMemoryWordsRealm("realmname")
|
| -
|
| - d = wFD(realm.createUser(u"test"))
|
| - yield d
|
| - user = d.getResult()
|
| -
|
| - d = wFD(realm.createGroup(u"test"))
|
| - yield d
|
| - group = d.getResult()
|
| - testGroupUsernameCollision = dG(testGroupUsernameCollision)
|
| -
|
| -
|
| - def testEnumeration(self):
|
| - realm = service.InMemoryWordsRealm("realmname")
|
| - d = wFD(realm.createGroup(u"groupone"))
|
| - yield d
|
| - d.getResult()
|
| -
|
| - d = wFD(realm.createGroup(u"grouptwo"))
|
| - yield d
|
| - d.getResult()
|
| -
|
| - groups = wFD(realm.itergroups())
|
| - yield groups
|
| - groups = groups.getResult()
|
| -
|
| - n = [g.name for g in groups]
|
| - n.sort()
|
| - self.assertEquals(n, ["groupone", "grouptwo"])
|
| - testEnumeration = dG(testEnumeration)
|
| -
|
| -
|
| -class TestGroup(object):
|
| - def __init__(self, name, size, topic):
|
| - self.name = name
|
| - self.size = lambda: size
|
| - self.meta = {'topic': topic}
|
| -
|
| -
|
| -class TestUser(object):
|
| - def __init__(self, name, groups, signOn, lastMessage):
|
| - self.name = name
|
| - self.itergroups = lambda: iter([TestGroup(g, 3, 'Hello') for g in groups])
|
| - self.signOn = signOn
|
| - self.lastMessage = lastMessage
|
| -
|
| -
|
| -class TestPortal(object):
|
| - def __init__(self):
|
| - self.logins = []
|
| -
|
| - def login(self, credentials, mind, *interfaces):
|
| - d = Deferred()
|
| - self.logins.append((credentials, mind, interfaces, d))
|
| - return d
|
| -
|
| -
|
| -class TestCaseUserAgg(object):
|
| - def __init__(self, user, realm, factory, address=address.IPv4Address('TCP', '127.0.0.1', 54321)):
|
| - self.user = user
|
| - self.transport = proto_helpers.StringTransportWithDisconnection()
|
| - self.protocol = factory.buildProtocol(address)
|
| - self.transport.protocol = self.protocol
|
| - self.user.mind = self.protocol
|
| - self.protocol.makeConnection(self.transport)
|
| -
|
| - def write(self, stuff):
|
| - if isinstance(stuff, unicode):
|
| - stuff = stuff.encode('utf-8')
|
| - self.protocol.dataReceived(stuff)
|
| -
|
| -
|
| -class IRCProtocolTestCase(unittest.TestCase):
|
| - STATIC_USERS = [
|
| - u'useruser', u'otheruser', u'someguy', u'firstuser', u'username',
|
| - u'userone', u'usertwo', u'userthree', u'someuser']
|
| -
|
| - def setUp(self):
|
| - self.realm = service.InMemoryWordsRealm("realmname")
|
| - self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
|
| - self.portal = portal.Portal(self.realm, [self.checker])
|
| - self.factory = service.IRCFactory(self.realm, self.portal)
|
| -
|
| - c = []
|
| - for nick in self.STATIC_USERS:
|
| - c.append(self.realm.createUser(nick))
|
| - self.checker.addUser(nick.encode('ascii'), nick + "_password")
|
| - return DeferredList(c)
|
| -
|
| -
|
| - def _assertGreeting(self, user):
|
| - # Make sure we get 1-4 at least
|
| - response = self._response(user)
|
| - expected = range(1, 5)
|
| - for (prefix, command, args) in response:
|
| - try:
|
| - expected.remove(int(command))
|
| - except KeyError:
|
| - pass
|
| - self.failIf(expected, "Missing responses for %r" % (expected,))
|
| -
|
| -
|
| - def _login(self, user, nick, password=None):
|
| - if password is None:
|
| - password = nick + "_password"
|
| - user.write('PASS %s\r\n' % (password,))
|
| - user.write('NICK %s extrainfo\r\n' % (nick,))
|
| -
|
| -
|
| - def _loggedInUser(self, name):
|
| - d = wFD(self.realm.lookupUser(name))
|
| - yield d
|
| - user = d.getResult()
|
| - agg = TestCaseUserAgg(user, self.realm, self.factory)
|
| - self._login(agg, name)
|
| - yield agg
|
| - _loggedInUser = dG(_loggedInUser)
|
| -
|
| -
|
| - def _response(self, user):
|
| - response = user.transport.value().splitlines()
|
| - user.transport.clear()
|
| - return map(irc.parsemsg, response)
|
| -
|
| -
|
| - def testPASSLogin(self):
|
| - user = wFD(self._loggedInUser(u'firstuser'))
|
| - yield user
|
| - user = user.getResult()
|
| - self._assertGreeting(user)
|
| - testPASSLogin = dG(testPASSLogin)
|
| -
|
| -
|
| - def testNickServLogin(self):
|
| - firstuser = wFD(self.realm.lookupUser(u'firstuser'))
|
| - yield firstuser
|
| - firstuser = firstuser.getResult()
|
| -
|
| - user = TestCaseUserAgg(firstuser, self.realm, self.factory)
|
| - user.write('NICK firstuser extrainfo\r\n')
|
| - response = self._response(user)
|
| - self.assertEquals(len(response), 1)
|
| - self.assertEquals(response[0][0], service.NICKSERV)
|
| - self.assertEquals(response[0][1], 'PRIVMSG')
|
| - self.assertEquals(response[0][2], ['firstuser', 'Password?'])
|
| - user.transport.clear()
|
| -
|
| - user.write('PRIVMSG nickserv firstuser_password\r\n')
|
| - self._assertGreeting(user)
|
| - testNickServLogin = dG(testNickServLogin)
|
| -
|
| -
|
| - def testFailedLogin(self):
|
| - firstuser = wFD(self.realm.lookupUser(u'firstuser'))
|
| - yield firstuser
|
| - firstuser = firstuser.getResult()
|
| -
|
| - user = TestCaseUserAgg(firstuser, self.realm, self.factory)
|
| - self._login(user, "firstuser", "wrongpass")
|
| - response = self._response(user)
|
| - self.assertEquals(len(response), 1)
|
| - self.assertEquals(response[0][2], ['firstuser', 'Login failed. Goodbye.'])
|
| - testFailedLogin = dG(testFailedLogin)
|
| -
|
| -
|
| - def testLogout(self):
|
| - logout = []
|
| - firstuser = wFD(self.realm.lookupUser(u'firstuser'))
|
| - yield firstuser
|
| - firstuser = firstuser.getResult()
|
| -
|
| - user = TestCaseUserAgg(firstuser, self.realm, self.factory)
|
| - self._login(user, "firstuser")
|
| - user.protocol.logout = lambda: logout.append(True)
|
| - user.write('QUIT\r\n')
|
| - self.assertEquals(logout, [True])
|
| - testLogout = dG(testLogout)
|
| -
|
| -
|
| - def testJoin(self):
|
| - firstuser = wFD(self.realm.lookupUser(u'firstuser'))
|
| - yield firstuser
|
| - firstuser = firstuser.getResult()
|
| -
|
| - somechannel = wFD(self.realm.createGroup(u"somechannel"))
|
| - yield somechannel
|
| - somechannel = somechannel.getResult()
|
| -
|
| - # Bring in one user, make sure he gets into the channel sanely
|
| - user = TestCaseUserAgg(firstuser, self.realm, self.factory)
|
| - self._login(user, "firstuser")
|
| - user.transport.clear()
|
| - user.write('JOIN #somechannel\r\n')
|
| -
|
| - response = self._response(user)
|
| - self.assertEquals(len(response), 5)
|
| -
|
| - # Join message
|
| - self.assertEquals(response[0][0], 'firstuser!firstuser@realmname')
|
| - self.assertEquals(response[0][1], 'JOIN')
|
| - self.assertEquals(response[0][2], ['#somechannel'])
|
| -
|
| - # User list
|
| - self.assertEquals(response[1][1], '353')
|
| - self.assertEquals(response[2][1], '366')
|
| -
|
| - # Topic (or lack thereof, as the case may be)
|
| - self.assertEquals(response[3][1], '332')
|
| - self.assertEquals(response[4][1], '333')
|
| -
|
| -
|
| - # Hook up another client! It is a CHAT SYSTEM!!!!!!!
|
| - other = wFD(self._loggedInUser(u'otheruser'))
|
| - yield other
|
| - other = other.getResult()
|
| -
|
| - other.transport.clear()
|
| - user.transport.clear()
|
| - other.write('JOIN #somechannel\r\n')
|
| -
|
| - # At this point, both users should be in the channel
|
| - response = self._response(other)
|
| -
|
| - event = self._response(user)
|
| - self.assertEquals(len(event), 1)
|
| - self.assertEquals(event[0][0], 'otheruser!otheruser@realmname')
|
| - self.assertEquals(event[0][1], 'JOIN')
|
| - self.assertEquals(event[0][2], ['#somechannel'])
|
| -
|
| - self.assertEquals(response[1][0], 'realmname')
|
| - self.assertEquals(response[1][1], '353')
|
| - self.assertEquals(response[1][2], ['otheruser', '=', '#somechannel', 'firstuser otheruser'])
|
| - testJoin = dG(testJoin)
|
| -
|
| -
|
| - def testLeave(self):
|
| - user = wFD(self._loggedInUser(u'useruser'))
|
| - yield user
|
| - user = user.getResult()
|
| -
|
| - somechannel = wFD(self.realm.createGroup(u"somechannel"))
|
| - yield somechannel
|
| - somechannel = somechannel.getResult()
|
| -
|
| - user.write('JOIN #somechannel\r\n')
|
| - user.transport.clear()
|
| -
|
| - other = wFD(self._loggedInUser(u'otheruser'))
|
| - yield other
|
| - other = other.getResult()
|
| -
|
| - other.write('JOIN #somechannel\r\n')
|
| -
|
| - user.transport.clear()
|
| - other.transport.clear()
|
| -
|
| - user.write('PART #somechannel\r\n')
|
| -
|
| - response = self._response(user)
|
| - event = self._response(other)
|
| -
|
| - self.assertEquals(len(response), 1)
|
| - self.assertEquals(response[0][0], 'useruser!useruser@realmname')
|
| - self.assertEquals(response[0][1], 'PART')
|
| - self.assertEquals(response[0][2], ['#somechannel', 'leaving'])
|
| - self.assertEquals(response, event)
|
| -
|
| - # Now again, with a part message
|
| - user.write('JOIN #somechannel\r\n')
|
| -
|
| - user.transport.clear()
|
| - other.transport.clear()
|
| -
|
| - user.write('PART #somechannel :goodbye stupidheads\r\n')
|
| -
|
| - response = self._response(user)
|
| - event = self._response(other)
|
| -
|
| - self.assertEquals(len(response), 1)
|
| - self.assertEquals(response[0][0], 'useruser!useruser@realmname')
|
| - self.assertEquals(response[0][1], 'PART')
|
| - self.assertEquals(response[0][2], ['#somechannel', 'goodbye stupidheads'])
|
| - self.assertEquals(response, event)
|
| - testLeave = dG(testLeave)
|
| -
|
| -
|
| - def testGetTopic(self):
|
| - user = wFD(self._loggedInUser(u'useruser'))
|
| - yield user
|
| - user = user.getResult()
|
| -
|
| - group = service.Group("somechannel")
|
| - group.meta["topic"] = "This is a test topic."
|
| - group.meta["topic_author"] = "some_fellow"
|
| - group.meta["topic_date"] = 77777777
|
| -
|
| - add = wFD(self.realm.addGroup(group))
|
| - yield add
|
| - add.getResult()
|
| -
|
| - user.transport.clear()
|
| - user.write("JOIN #somechannel\r\n")
|
| -
|
| - response = self._response(user)
|
| -
|
| - self.assertEquals(response[3][0], 'realmname')
|
| - self.assertEquals(response[3][1], '332')
|
| -
|
| - # XXX Sigh. irc.parsemsg() is not as correct as one might hope.
|
| - self.assertEquals(response[3][2], ['useruser', '#somechannel', 'This is a test topic.'])
|
| - self.assertEquals(response[4][1], '333')
|
| - self.assertEquals(response[4][2], ['useruser', '#somechannel', 'some_fellow', '77777777'])
|
| -
|
| - user.transport.clear()
|
| -
|
| - user.write('TOPIC #somechannel\r\n')
|
| -
|
| - response = self._response(user)
|
| -
|
| - self.assertEquals(response[0][1], '332')
|
| - self.assertEquals(response[0][2], ['useruser', '#somechannel', 'This is a test topic.'])
|
| - self.assertEquals(response[1][1], '333')
|
| - self.assertEquals(response[1][2], ['useruser', '#somechannel', 'some_fellow', '77777777'])
|
| - testGetTopic = dG(testGetTopic)
|
| -
|
| -
|
| - def testSetTopic(self):
|
| - user = wFD(self._loggedInUser(u'useruser'))
|
| - yield user
|
| - user = user.getResult()
|
| -
|
| - add = wFD(self.realm.createGroup(u"somechannel"))
|
| - yield add
|
| - somechannel = add.getResult()
|
| -
|
| - user.write("JOIN #somechannel\r\n")
|
| -
|
| - other = wFD(self._loggedInUser(u'otheruser'))
|
| - yield other
|
| - other = other.getResult()
|
| -
|
| - other.write("JOIN #somechannel\r\n")
|
| -
|
| - user.transport.clear()
|
| - other.transport.clear()
|
| -
|
| - other.write('TOPIC #somechannel :This is the new topic.\r\n')
|
| -
|
| - response = self._response(other)
|
| - event = self._response(user)
|
| -
|
| - self.assertEquals(response, event)
|
| -
|
| - self.assertEquals(response[0][0], 'otheruser!otheruser@realmname')
|
| - self.assertEquals(response[0][1], 'TOPIC')
|
| - self.assertEquals(response[0][2], ['#somechannel', 'This is the new topic.'])
|
| -
|
| - other.transport.clear()
|
| -
|
| - somechannel.meta['topic_date'] = 12345
|
| - other.write('TOPIC #somechannel\r\n')
|
| -
|
| - response = self._response(other)
|
| - self.assertEquals(response[0][1], '332')
|
| - self.assertEquals(response[0][2], ['otheruser', '#somechannel', 'This is the new topic.'])
|
| - self.assertEquals(response[1][1], '333')
|
| - self.assertEquals(response[1][2], ['otheruser', '#somechannel', 'otheruser', '12345'])
|
| -
|
| - other.transport.clear()
|
| - other.write('TOPIC #asdlkjasd\r\n')
|
| -
|
| - response = self._response(other)
|
| - self.assertEquals(response[0][1], '403')
|
| - testSetTopic = dG(testSetTopic)
|
| -
|
| -
|
| - def testGroupMessage(self):
|
| - user = wFD(self._loggedInUser(u'useruser'))
|
| - yield user
|
| - user = user.getResult()
|
| -
|
| - add = wFD(self.realm.createGroup(u"somechannel"))
|
| - yield add
|
| - somechannel = add.getResult()
|
| -
|
| - user.write("JOIN #somechannel\r\n")
|
| -
|
| - other = wFD(self._loggedInUser(u'otheruser'))
|
| - yield other
|
| - other = other.getResult()
|
| -
|
| - other.write("JOIN #somechannel\r\n")
|
| -
|
| - user.transport.clear()
|
| - other.transport.clear()
|
| -
|
| - user.write('PRIVMSG #somechannel :Hello, world.\r\n')
|
| -
|
| - response = self._response(user)
|
| - event = self._response(other)
|
| -
|
| - self.failIf(response)
|
| - self.assertEquals(len(event), 1)
|
| - self.assertEquals(event[0][0], 'useruser!useruser@realmname')
|
| - self.assertEquals(event[0][1], 'PRIVMSG', -1)
|
| - self.assertEquals(event[0][2], ['#somechannel', 'Hello, world.'])
|
| - testGroupMessage = dG(testGroupMessage)
|
| -
|
| -
|
| - def testPrivateMessage(self):
|
| - user = wFD(self._loggedInUser(u'useruser'))
|
| - yield user
|
| - user = user.getResult()
|
| -
|
| - other = wFD(self._loggedInUser(u'otheruser'))
|
| - yield other
|
| - other = other.getResult()
|
| -
|
| - user.transport.clear()
|
| - other.transport.clear()
|
| -
|
| - user.write('PRIVMSG otheruser :Hello, monkey.\r\n')
|
| -
|
| - response = self._response(user)
|
| - event = self._response(other)
|
| -
|
| - self.failIf(response)
|
| - self.assertEquals(len(event), 1)
|
| - self.assertEquals(event[0][0], 'useruser!useruser@realmname')
|
| - self.assertEquals(event[0][1], 'PRIVMSG')
|
| - self.assertEquals(event[0][2], ['otheruser', 'Hello, monkey.'])
|
| -
|
| - user.write('PRIVMSG nousernamedthis :Hello, monkey.\r\n')
|
| -
|
| - response = self._response(user)
|
| -
|
| - self.assertEquals(len(response), 1)
|
| - self.assertEquals(response[0][0], 'realmname')
|
| - self.assertEquals(response[0][1], '401')
|
| - self.assertEquals(response[0][2], ['useruser', 'nousernamedthis', 'No such nick/channel.'])
|
| - testPrivateMessage = dG(testPrivateMessage)
|
| -
|
| -
|
| - def testOper(self):
|
| - user = wFD(self._loggedInUser(u'useruser'))
|
| - yield user
|
| - user = user.getResult()
|
| -
|
| - user.transport.clear()
|
| - user.write('OPER user pass\r\n')
|
| - response = self._response(user)
|
| -
|
| - self.assertEquals(len(response), 1)
|
| - self.assertEquals(response[0][1], '491')
|
| - testOper = dG(testOper)
|
| -
|
| -
|
| - def testGetUserMode(self):
|
| - user = wFD(self._loggedInUser(u'useruser'))
|
| - yield user
|
| - user = user.getResult()
|
| -
|
| - user.transport.clear()
|
| - user.write('MODE useruser\r\n')
|
| -
|
| - response = self._response(user)
|
| - self.assertEquals(len(response), 1)
|
| - self.assertEquals(response[0][0], 'realmname')
|
| - self.assertEquals(response[0][1], '221')
|
| - self.assertEquals(response[0][2], ['useruser', '+'])
|
| - testGetUserMode = dG(testGetUserMode)
|
| -
|
| -
|
| - def testSetUserMode(self):
|
| - user = wFD(self._loggedInUser(u'useruser'))
|
| - yield user
|
| - user = user.getResult()
|
| -
|
| - user.transport.clear()
|
| - user.write('MODE useruser +abcd\r\n')
|
| -
|
| - response = self._response(user)
|
| - self.assertEquals(len(response), 1)
|
| - self.assertEquals(response[0][1], '472')
|
| - testSetUserMode = dG(testSetUserMode)
|
| -
|
| -
|
| - def testGetGroupMode(self):
|
| - user = wFD(self._loggedInUser(u'useruser'))
|
| - yield user
|
| - user = user.getResult()
|
| -
|
| - add = wFD(self.realm.createGroup(u"somechannel"))
|
| - yield add
|
| - somechannel = add.getResult()
|
| -
|
| - user.write('JOIN #somechannel\r\n')
|
| -
|
| - user.transport.clear()
|
| - user.write('MODE #somechannel\r\n')
|
| -
|
| - response = self._response(user)
|
| - self.assertEquals(len(response), 1)
|
| - self.assertEquals(response[0][1], '324')
|
| - testGetGroupMode = dG(testGetGroupMode)
|
| -
|
| -
|
| - def testSetGroupMode(self):
|
| - user = wFD(self._loggedInUser(u'useruser'))
|
| - yield user
|
| - user = user.getResult()
|
| -
|
| - group = wFD(self.realm.createGroup(u"groupname"))
|
| - yield group
|
| - group = group.getResult()
|
| -
|
| - user.write('JOIN #groupname\r\n')
|
| -
|
| - user.transport.clear()
|
| - user.write('MODE #groupname +abcd\r\n')
|
| -
|
| - response = self._response(user)
|
| - self.assertEquals(len(response), 1)
|
| - self.assertEquals(response[0][1], '472')
|
| - testSetGroupMode = dG(testSetGroupMode)
|
| -
|
| -
|
| - def testWho(self):
|
| - group = service.Group('groupname')
|
| - add = wFD(self.realm.addGroup(group))
|
| - yield add
|
| - add.getResult()
|
| -
|
| - users = []
|
| - for nick in u'userone', u'usertwo', u'userthree':
|
| - u = wFD(self._loggedInUser(nick))
|
| - yield u
|
| - u = u.getResult()
|
| - users.append(u)
|
| - users[-1].write('JOIN #groupname\r\n')
|
| - for user in users:
|
| - user.transport.clear()
|
| -
|
| - users[0].write('WHO #groupname\r\n')
|
| -
|
| - r = self._response(users[0])
|
| - self.failIf(self._response(users[1]))
|
| - self.failIf(self._response(users[2]))
|
| -
|
| - wantusers = ['userone', 'usertwo', 'userthree']
|
| - for (prefix, code, stuff) in r[:-1]:
|
| - self.assertEquals(prefix, 'realmname')
|
| - self.assertEquals(code, '352')
|
| -
|
| - (myname, group, theirname, theirhost, theirserver, theirnick, flag, extra) = stuff
|
| - self.assertEquals(myname, 'userone')
|
| - self.assertEquals(group, '#groupname')
|
| - self.failUnless(theirname in wantusers)
|
| - self.assertEquals(theirhost, 'realmname')
|
| - self.assertEquals(theirserver, 'realmname')
|
| - wantusers.remove(theirnick)
|
| - self.assertEquals(flag, 'H')
|
| - self.assertEquals(extra, '0 ' + theirnick)
|
| - self.failIf(wantusers)
|
| -
|
| - prefix, code, stuff = r[-1]
|
| - self.assertEquals(prefix, 'realmname')
|
| - self.assertEquals(code, '315')
|
| - myname, channel, extra = stuff
|
| - self.assertEquals(myname, 'userone')
|
| - self.assertEquals(channel, '#groupname')
|
| - self.assertEquals(extra, 'End of /WHO list.')
|
| - testWho = dG(testWho)
|
| -
|
| -
|
| - def testList(self):
|
| - user = wFD(self._loggedInUser(u"someuser"))
|
| - yield user
|
| - user = user.getResult()
|
| - user.transport.clear()
|
| -
|
| - somegroup = wFD(self.realm.createGroup(u"somegroup"))
|
| - yield somegroup
|
| - somegroup = somegroup.getResult()
|
| - somegroup.size = lambda: succeed(17)
|
| - somegroup.meta['topic'] = 'this is the topic woo'
|
| -
|
| - # Test one group
|
| - user.write('LIST #somegroup\r\n')
|
| -
|
| - r = self._response(user)
|
| - self.assertEquals(len(r), 2)
|
| - resp, end = r
|
| -
|
| - self.assertEquals(resp[0], 'realmname')
|
| - self.assertEquals(resp[1], '322')
|
| - self.assertEquals(resp[2][0], 'someuser')
|
| - self.assertEquals(resp[2][1], 'somegroup')
|
| - self.assertEquals(resp[2][2], '17')
|
| - self.assertEquals(resp[2][3], 'this is the topic woo')
|
| -
|
| - self.assertEquals(end[0], 'realmname')
|
| - self.assertEquals(end[1], '323')
|
| - self.assertEquals(end[2][0], 'someuser')
|
| - self.assertEquals(end[2][1], 'End of /LIST')
|
| -
|
| - user.transport.clear()
|
| - # Test all groups
|
| -
|
| - user.write('LIST\r\n')
|
| - r = self._response(user)
|
| - self.assertEquals(len(r), 2)
|
| -
|
| - fg1, end = r
|
| -
|
| - self.assertEquals(fg1[1], '322')
|
| - self.assertEquals(fg1[2][1], 'somegroup')
|
| - self.assertEquals(fg1[2][2], '17')
|
| - self.assertEquals(fg1[2][3], 'this is the topic woo')
|
| -
|
| - self.assertEquals(end[1], '323')
|
| - testList = dG(testList)
|
| -
|
| -
|
| - def testWhois(self):
|
| - user = wFD(self._loggedInUser(u'someguy'))
|
| - yield user
|
| - user = user.getResult()
|
| -
|
| - otherguy = service.User("otherguy")
|
| - otherguy.itergroups = lambda: iter([
|
| - service.Group('groupA'),
|
| - service.Group('groupB')])
|
| - otherguy.signOn = 10
|
| - otherguy.lastMessage = time.time() - 15
|
| -
|
| - add = wFD(self.realm.addUser(otherguy))
|
| - yield add
|
| - add.getResult()
|
| -
|
| - user.transport.clear()
|
| - user.write('WHOIS otherguy\r\n')
|
| - r = self._response(user)
|
| -
|
| - self.assertEquals(len(r), 5)
|
| - wuser, wserver, idle, channels, end = r
|
| -
|
| - self.assertEquals(wuser[0], 'realmname')
|
| - self.assertEquals(wuser[1], '311')
|
| - self.assertEquals(wuser[2][0], 'someguy')
|
| - self.assertEquals(wuser[2][1], 'otherguy')
|
| - self.assertEquals(wuser[2][2], 'otherguy')
|
| - self.assertEquals(wuser[2][3], 'realmname')
|
| - self.assertEquals(wuser[2][4], '*')
|
| - self.assertEquals(wuser[2][5], 'otherguy')
|
| -
|
| - self.assertEquals(wserver[0], 'realmname')
|
| - self.assertEquals(wserver[1], '312')
|
| - self.assertEquals(wserver[2][0], 'someguy')
|
| - self.assertEquals(wserver[2][1], 'otherguy')
|
| - self.assertEquals(wserver[2][2], 'realmname')
|
| - self.assertEquals(wserver[2][3], 'Hi mom!')
|
| -
|
| - self.assertEquals(idle[0], 'realmname')
|
| - self.assertEquals(idle[1], '317')
|
| - self.assertEquals(idle[2][0], 'someguy')
|
| - self.assertEquals(idle[2][1], 'otherguy')
|
| - self.assertEquals(idle[2][2], '15')
|
| - self.assertEquals(idle[2][3], '10')
|
| - self.assertEquals(idle[2][4], "seconds idle, signon time")
|
| -
|
| - self.assertEquals(channels[0], 'realmname')
|
| - self.assertEquals(channels[1], '319')
|
| - self.assertEquals(channels[2][0], 'someguy')
|
| - self.assertEquals(channels[2][1], 'otherguy')
|
| - self.assertEquals(channels[2][2], '#groupA #groupB')
|
| -
|
| - self.assertEquals(end[0], 'realmname')
|
| - self.assertEquals(end[1], '318')
|
| - self.assertEquals(end[2][0], 'someguy')
|
| - self.assertEquals(end[2][1], 'otherguy')
|
| - self.assertEquals(end[2][2], 'End of WHOIS list.')
|
| - testWhois = dG(testWhois)
|
| -
|
| -
|
| -class TestMind(service.PBMind):
|
| - def __init__(self, *a, **kw):
|
| - self.joins = []
|
| - self.parts = []
|
| - self.messages = []
|
| - self.meta = []
|
| -
|
| - def remote_userJoined(self, user, group):
|
| - self.joins.append((user, group))
|
| -
|
| - def remote_userLeft(self, user, group, reason):
|
| - self.parts.append((user, group, reason))
|
| -
|
| - def remote_receive(self, sender, recipient, message):
|
| - self.messages.append((sender, recipient, message))
|
| -
|
| - def remote_groupMetaUpdate(self, group, meta):
|
| - self.meta.append((group, meta))
|
| -pb.setUnjellyableForClass(TestMind, service.PBMindReference)
|
| -
|
| -
|
| -class PBProtocolTestCase(unittest.TestCase):
|
| - def setUp(self):
|
| - self.realm = service.InMemoryWordsRealm("realmname")
|
| - self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
|
| - self.portal = portal.Portal(
|
| - self.realm, [self.checker])
|
| - self.serverFactory = pb.PBServerFactory(self.portal)
|
| - self.serverFactory.protocol = self._protocolFactory
|
| - self.serverFactory.unsafeTracebacks = True
|
| - self.clientFactory = pb.PBClientFactory()
|
| - self.clientFactory.unsafeTracebacks = True
|
| - self.serverPort = reactor.listenTCP(0, self.serverFactory)
|
| - self.clientConn = reactor.connectTCP(
|
| - '127.0.0.1',
|
| - self.serverPort.getHost().port,
|
| - self.clientFactory)
|
| -
|
| - def _protocolFactory(self, *args, **kw):
|
| - self._serverProtocol = pb.Broker(0)
|
| - return self._serverProtocol
|
| -
|
| - def tearDown(self):
|
| - d3 = Deferred()
|
| - self._serverProtocol.notifyOnDisconnect(lambda: d3.callback(None))
|
| - return DeferredList([
|
| - maybeDeferred(self.serverPort.stopListening),
|
| - maybeDeferred(self.clientConn.disconnect), d3])
|
| -
|
| - def _loggedInAvatar(self, name, password, mind):
|
| - creds = credentials.UsernamePassword(name, password)
|
| - self.checker.addUser(name.encode('ascii'), password)
|
| - d = self.realm.createUser(name)
|
| - d.addCallback(lambda ign: self.clientFactory.login(creds, mind))
|
| - return d
|
| -
|
| - def testGroups(self):
|
| - mindone = TestMind()
|
| - one = wFD(self._loggedInAvatar(u"one", "p1", mindone))
|
| - yield one
|
| - one = one.getResult()
|
| -
|
| - mindtwo = TestMind()
|
| - two = wFD(self._loggedInAvatar(u"two", "p2", mindtwo))
|
| - yield two
|
| - two = two.getResult()
|
| -
|
| - add = wFD(self.realm.createGroup(u"foobar"))
|
| - yield add
|
| - add.getResult()
|
| -
|
| - groupone = wFD(one.join(u"foobar"))
|
| - yield groupone
|
| - groupone = groupone.getResult()
|
| -
|
| - grouptwo = wFD(two.join(u"foobar"))
|
| - yield grouptwo
|
| - grouptwo = grouptwo.getResult()
|
| -
|
| - msg = wFD(groupone.send({"text": "hello, monkeys"}))
|
| - yield msg
|
| - msg = msg.getResult()
|
| -
|
| - leave = wFD(groupone.leave())
|
| - yield leave
|
| - leave = leave.getResult()
|
| - testGroups = dG(testGroups)
|
|
|