| Index: third_party/twisted_8_1/twisted/words/test/test_irc.py
|
| diff --git a/third_party/twisted_8_1/twisted/words/test/test_irc.py b/third_party/twisted_8_1/twisted/words/test/test_irc.py
|
| deleted file mode 100644
|
| index 87f15ae354418dc84d847bc8c612d9af1e1b2d66..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/words/test/test_irc.py
|
| +++ /dev/null
|
| @@ -1,451 +0,0 @@
|
| -# Copyright (c) 2001-2005 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -from StringIO import StringIO
|
| -import time
|
| -
|
| -from twisted.trial import unittest
|
| -from twisted.trial.unittest import TestCase
|
| -from twisted.words.protocols import irc
|
| -from twisted.words.protocols.irc import IRCClient
|
| -from twisted.internet import protocol
|
| -
|
| -
|
| -class StringIOWithoutClosing(StringIO):
|
| - def close(self):
|
| - pass
|
| -
|
| -stringSubjects = [
|
| - "Hello, this is a nice string with no complications.",
|
| - "xargs%(NUL)smight%(NUL)slike%(NUL)sthis" % {'NUL': irc.NUL },
|
| - "embedded%(CR)snewline%(CR)s%(NL)sFUN%(NL)s" % {'CR': irc.CR,
|
| - 'NL': irc.NL},
|
| - "escape!%(X)s escape!%(M)s %(X)s%(X)sa %(M)s0" % {'X': irc.X_QUOTE,
|
| - 'M': irc.M_QUOTE}
|
| - ]
|
| -
|
| -
|
| -class QuotingTest(unittest.TestCase):
|
| - def test_lowquoteSanity(self):
|
| - """Testing client-server level quote/dequote"""
|
| - for s in stringSubjects:
|
| - self.failUnlessEqual(s, irc.lowDequote(irc.lowQuote(s)))
|
| -
|
| - def test_ctcpquoteSanity(self):
|
| - """Testing CTCP message level quote/dequote"""
|
| - for s in stringSubjects:
|
| - self.failUnlessEqual(s, irc.ctcpDequote(irc.ctcpQuote(s)))
|
| -
|
| -
|
| -class IRCClientWithoutLogin(irc.IRCClient):
|
| - performLogin = 0
|
| -
|
| -
|
| -class CTCPTest(unittest.TestCase):
|
| - def setUp(self):
|
| - self.file = StringIOWithoutClosing()
|
| - self.transport = protocol.FileWrapper(self.file)
|
| - self.client = IRCClientWithoutLogin()
|
| - self.client.makeConnection(self.transport)
|
| -
|
| - def test_ERRMSG(self):
|
| - """Testing CTCP query ERRMSG.
|
| -
|
| - Not because this is this is an especially important case in the
|
| - field, but it does go through the entire dispatch/decode/encode
|
| - process.
|
| - """
|
| -
|
| - errQuery = (":nick!guy@over.there PRIVMSG #theChan :"
|
| - "%(X)cERRMSG t%(X)c%(EOL)s"
|
| - % {'X': irc.X_DELIM,
|
| - 'EOL': irc.CR + irc.LF})
|
| -
|
| - errReply = ("NOTICE nick :%(X)cERRMSG t :"
|
| - "No error has occoured.%(X)c%(EOL)s"
|
| - % {'X': irc.X_DELIM,
|
| - 'EOL': irc.CR + irc.LF})
|
| -
|
| - self.client.dataReceived(errQuery)
|
| - reply = self.file.getvalue()
|
| -
|
| - self.failUnlessEqual(errReply, reply)
|
| -
|
| - def tearDown(self):
|
| - self.transport.loseConnection()
|
| - self.client.connectionLost()
|
| - del self.client
|
| - del self.transport
|
| -
|
| -class NoticingClient(object, IRCClientWithoutLogin):
|
| - methods = {
|
| - 'created': ('when',),
|
| - 'yourHost': ('info',),
|
| - 'myInfo': ('servername', 'version', 'umodes', 'cmodes'),
|
| - 'luserClient': ('info',),
|
| - 'bounce': ('info',),
|
| - 'isupport': ('options',),
|
| - 'luserChannels': ('channels',),
|
| - 'luserOp': ('ops',),
|
| - 'luserMe': ('info',),
|
| - 'receivedMOTD': ('motd',),
|
| -
|
| - 'privmsg': ('user', 'channel', 'message'),
|
| - 'joined': ('channel',),
|
| - 'left': ('channel',),
|
| - 'noticed': ('user', 'channel', 'message'),
|
| - 'modeChanged': ('user', 'channel', 'set', 'modes', 'args'),
|
| - 'pong': ('user', 'secs'),
|
| - 'signedOn': (),
|
| - 'kickedFrom': ('channel', 'kicker', 'message'),
|
| - 'nickChanged': ('nick',),
|
| -
|
| - 'userJoined': ('user', 'channel'),
|
| - 'userLeft': ('user', 'channel'),
|
| - 'userKicked': ('user', 'channel', 'kicker', 'message'),
|
| - 'action': ('user', 'channel', 'data'),
|
| - 'topicUpdated': ('user', 'channel', 'newTopic'),
|
| - 'userRenamed': ('oldname', 'newname')}
|
| -
|
| - def __init__(self, *a, **kw):
|
| - object.__init__(self)
|
| - self.calls = []
|
| -
|
| - def __getattribute__(self, name):
|
| - if name.startswith('__') and name.endswith('__'):
|
| - return super(NoticingClient, self).__getattribute__(name)
|
| - try:
|
| - args = super(NoticingClient, self).__getattribute__('methods')[name]
|
| - except KeyError:
|
| - return super(NoticingClient, self).__getattribute__(name)
|
| - else:
|
| - return self.makeMethod(name, args)
|
| -
|
| - def makeMethod(self, fname, args):
|
| - def method(*a, **kw):
|
| - if len(a) > len(args):
|
| - raise TypeError("TypeError: %s() takes %d arguments "
|
| - "(%d given)" % (fname, len(args), len(a)))
|
| - for (name, value) in zip(args, a):
|
| - if name in kw:
|
| - raise TypeError("TypeError: %s() got multiple values "
|
| - "for keyword argument '%s'" % (fname, name))
|
| - else:
|
| - kw[name] = value
|
| - if len(kw) != len(args):
|
| - raise TypeError("TypeError: %s() takes %d arguments "
|
| - "(%d given)" % (fname, len(args), len(a)))
|
| - self.calls.append((fname, kw))
|
| - return method
|
| -
|
| -def pop(dict, key, default):
|
| - try:
|
| - value = dict[key]
|
| - except KeyError:
|
| - return default
|
| - else:
|
| - del dict[key]
|
| - return value
|
| -
|
| -class ModeTestCase(unittest.TestCase):
|
| - def setUp(self):
|
| - self.file = StringIOWithoutClosing()
|
| - self.transport = protocol.FileWrapper(self.file)
|
| - self.client = NoticingClient()
|
| - self.client.makeConnection(self.transport)
|
| -
|
| - def tearDown(self):
|
| - self.transport.loseConnection()
|
| - self.client.connectionLost()
|
| - del self.client
|
| - del self.transport
|
| -
|
| - def testModeChange(self):
|
| - message = ":ChanServ!ChanServ@services. MODE #tanstaafl +o exarkun\r\n"
|
| - self.client.dataReceived(message)
|
| - self.assertEquals(
|
| - self.client.calls,
|
| - [('modeChanged', {'user': "ChanServ!ChanServ@services.",
|
| - 'channel': '#tanstaafl',
|
| - 'set': True,
|
| - 'modes': 'o',
|
| - 'args': ('exarkun',)})])
|
| -
|
| - def _serverTestImpl(self, code, msg, func, **kw):
|
| - host = pop(kw, 'host', 'server.host')
|
| - nick = pop(kw, 'nick', 'nickname')
|
| - args = pop(kw, 'args', '')
|
| -
|
| - message = (":" +
|
| - host + " " +
|
| - code + " " +
|
| - nick + " " +
|
| - args + " :" +
|
| - msg + "\r\n")
|
| -
|
| - self.client.dataReceived(message)
|
| - self.assertEquals(
|
| - self.client.calls,
|
| - [(func, kw)])
|
| -
|
| - def testYourHost(self):
|
| - msg = "Your host is some.host[blah.blah/6667], running version server-version-3"
|
| - self._serverTestImpl("002", msg, "yourHost", info=msg)
|
| -
|
| - def testCreated(self):
|
| - msg = "This server was cobbled together Fri Aug 13 18:00:25 UTC 2004"
|
| - self._serverTestImpl("003", msg, "created", when=msg)
|
| -
|
| - def testMyInfo(self):
|
| - msg = "server.host server-version abcDEF bcdEHI"
|
| - self._serverTestImpl("004", msg, "myInfo",
|
| - servername="server.host",
|
| - version="server-version",
|
| - umodes="abcDEF",
|
| - cmodes="bcdEHI")
|
| -
|
| - def testLuserClient(self):
|
| - msg = "There are 9227 victims and 9542 hiding on 24 servers"
|
| - self._serverTestImpl("251", msg, "luserClient",
|
| - info=msg)
|
| -
|
| - def testISupport(self):
|
| - args = ("MODES=4 CHANLIMIT=#:20 NICKLEN=16 USERLEN=10 HOSTLEN=63 "
|
| - "TOPICLEN=450 KICKLEN=450 CHANNELLEN=30 KEYLEN=23 CHANTYPES=# "
|
| - "PREFIX=(ov)@+ CASEMAPPING=ascii CAPAB IRCD=dancer")
|
| - msg = "are available on this server"
|
| - self._serverTestImpl("005", msg, "isupport", args=args,
|
| - options=['MODES=4',
|
| - 'CHANLIMIT=#:20',
|
| - 'NICKLEN=16',
|
| - 'USERLEN=10',
|
| - 'HOSTLEN=63',
|
| - 'TOPICLEN=450',
|
| - 'KICKLEN=450',
|
| - 'CHANNELLEN=30',
|
| - 'KEYLEN=23',
|
| - 'CHANTYPES=#',
|
| - 'PREFIX=(ov)@+',
|
| - 'CASEMAPPING=ascii',
|
| - 'CAPAB',
|
| - 'IRCD=dancer'])
|
| -
|
| - def testBounce(self):
|
| - msg = "Try server some.host, port 321"
|
| - self._serverTestImpl("005", msg, "bounce",
|
| - info=msg)
|
| -
|
| - def testLuserChannels(self):
|
| - args = "7116"
|
| - msg = "channels formed"
|
| - self._serverTestImpl("254", msg, "luserChannels", args=args,
|
| - channels=int(args))
|
| -
|
| - def testLuserOp(self):
|
| - args = "34"
|
| - msg = "flagged staff members"
|
| - self._serverTestImpl("252", msg, "luserOp", args=args,
|
| - ops=int(args))
|
| -
|
| - def testLuserMe(self):
|
| - msg = "I have 1937 clients and 0 servers"
|
| - self._serverTestImpl("255", msg, "luserMe",
|
| - info=msg)
|
| -
|
| - def testMOTD(self):
|
| - lines = [
|
| - ":host.name 375 nickname :- host.name Message of the Day -",
|
| - ":host.name 372 nickname :- Welcome to host.name",
|
| - ":host.name 376 nickname :End of /MOTD command."]
|
| - for L in lines:
|
| - self.assertEquals(self.client.calls, [])
|
| - self.client.dataReceived(L + '\r\n')
|
| -
|
| - self.assertEquals(
|
| - self.client.calls,
|
| - [("receivedMOTD", {"motd": ["host.name Message of the Day -", "Welcome to host.name"]})])
|
| -
|
| -
|
| - def _clientTestImpl(self, sender, group, type, msg, func, **kw):
|
| - ident = pop(kw, 'ident', 'ident')
|
| - host = pop(kw, 'host', 'host')
|
| -
|
| - wholeUser = sender + '!' + ident + '@' + host
|
| - message = (":" +
|
| - wholeUser + " " +
|
| - type + " " +
|
| - group + " :" +
|
| - msg + "\r\n")
|
| - self.client.dataReceived(message)
|
| - self.assertEquals(
|
| - self.client.calls,
|
| - [(func, kw)])
|
| - self.client.calls = []
|
| -
|
| - def testPrivmsg(self):
|
| - msg = "Tooty toot toot."
|
| - self._clientTestImpl("sender", "#group", "PRIVMSG", msg, "privmsg",
|
| - ident="ident", host="host",
|
| - # Expected results below
|
| - user="sender!ident@host",
|
| - channel="#group",
|
| - message=msg)
|
| -
|
| - self._clientTestImpl("sender", "recipient", "PRIVMSG", msg, "privmsg",
|
| - ident="ident", host="host",
|
| - # Expected results below
|
| - user="sender!ident@host",
|
| - channel="recipient",
|
| - message=msg)
|
| -
|
| -class BasicServerFunctionalityTestCase(unittest.TestCase):
|
| - def setUp(self):
|
| - self.f = StringIOWithoutClosing()
|
| - self.t = protocol.FileWrapper(self.f)
|
| - self.p = irc.IRC()
|
| - self.p.makeConnection(self.t)
|
| -
|
| - def check(self, s):
|
| - self.assertEquals(self.f.getvalue(), s)
|
| -
|
| - def testPrivmsg(self):
|
| - self.p.privmsg("this-is-sender", "this-is-recip", "this is message")
|
| - self.check(":this-is-sender PRIVMSG this-is-recip :this is message\r\n")
|
| -
|
| - def testNotice(self):
|
| - self.p.notice("this-is-sender", "this-is-recip", "this is notice")
|
| - self.check(":this-is-sender NOTICE this-is-recip :this is notice\r\n")
|
| -
|
| - def testAction(self):
|
| - self.p.action("this-is-sender", "this-is-recip", "this is action")
|
| - self.check(":this-is-sender ACTION this-is-recip :this is action\r\n")
|
| -
|
| - def testJoin(self):
|
| - self.p.join("this-person", "#this-channel")
|
| - self.check(":this-person JOIN #this-channel\r\n")
|
| -
|
| - def testPart(self):
|
| - self.p.part("this-person", "#that-channel")
|
| - self.check(":this-person PART #that-channel\r\n")
|
| -
|
| - def testWhois(self):
|
| - """
|
| - Verify that a whois by the client receives the right protocol actions
|
| - from the server.
|
| - """
|
| - timestamp = int(time.time()-100)
|
| - hostname = self.p.hostname
|
| - req = 'requesting-nick'
|
| - targ = 'target-nick'
|
| - self.p.whois(req, targ, 'target', 'host.com',
|
| - 'Target User', 'irc.host.com', 'A fake server', False,
|
| - 12, timestamp, ['#fakeusers', '#fakemisc'])
|
| - expected = '\r\n'.join([
|
| -':%(hostname)s 311 %(req)s %(targ)s target host.com * :Target User',
|
| -':%(hostname)s 312 %(req)s %(targ)s irc.host.com :A fake server',
|
| -':%(hostname)s 317 %(req)s %(targ)s 12 %(timestamp)s :seconds idle, signon time',
|
| -':%(hostname)s 319 %(req)s %(targ)s :#fakeusers #fakemisc',
|
| -':%(hostname)s 318 %(req)s %(targ)s :End of WHOIS list.',
|
| -'']) % dict(hostname=hostname, timestamp=timestamp, req=req, targ=targ)
|
| - self.check(expected)
|
| -
|
| -
|
| -class DummyClient(irc.IRCClient):
|
| - def __init__(self):
|
| - self.lines = []
|
| - def sendLine(self, m):
|
| - self.lines.append(m)
|
| -
|
| -
|
| -class ClientMsgTests(unittest.TestCase):
|
| - def setUp(self):
|
| - self.client = DummyClient()
|
| -
|
| - def testSingleLine(self):
|
| - self.client.msg('foo', 'bar')
|
| - self.assertEquals(self.client.lines, ['PRIVMSG foo :bar'])
|
| -
|
| - def testDodgyMaxLength(self):
|
| - self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 0)
|
| - self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 3)
|
| -
|
| - def testMultipleLine(self):
|
| - maxLen = len('PRIVMSG foo :') + 3 + 2 # 2 for line endings
|
| - self.client.msg('foo', 'barbazbo', maxLen)
|
| - self.assertEquals(self.client.lines, ['PRIVMSG foo :bar',
|
| - 'PRIVMSG foo :baz',
|
| - 'PRIVMSG foo :bo'])
|
| -
|
| - def testSufficientWidth(self):
|
| - msg = 'barbazbo'
|
| - maxLen = len('PRIVMSG foo :%s' % (msg,)) + 2
|
| - self.client.msg('foo', msg, maxLen)
|
| - self.assertEquals(self.client.lines, ['PRIVMSG foo :%s' % (msg,)])
|
| - self.client.lines = []
|
| - self.client.msg('foo', msg, maxLen-1)
|
| - self.assertEquals(2, len(self.client.lines))
|
| - self.client.lines = []
|
| - self.client.msg('foo', msg, maxLen+1)
|
| - self.assertEquals(1, len(self.client.lines))
|
| -
|
| - def testSplitSanity(self):
|
| - # Whiteboxing
|
| - self.assertRaises(ValueError, irc.split, 'foo', -1)
|
| - self.assertRaises(ValueError, irc.split, 'foo', 0)
|
| - self.assertEquals([], irc.split('', 1))
|
| - self.assertEquals([], irc.split(''))
|
| -
|
| -
|
| -class ClientTests(TestCase):
|
| - """
|
| - Tests for the protocol-level behavior of IRCClient methods intended to
|
| - be called by application code.
|
| - """
|
| - def setUp(self):
|
| - self.transport = StringIO()
|
| - self.protocol = IRCClient()
|
| - self.protocol.performLogin = False
|
| - self.protocol.makeConnection(self.transport)
|
| -
|
| - # Sanity check - we don't want anything to have happened at this
|
| - # point, since we're not in a test yet.
|
| - self.failIf(self.transport.getvalue())
|
| -
|
| -
|
| - def test_register(self):
|
| - """
|
| - Verify that the L{IRCClient.register} method sends a a USER command
|
| - with the correct arguments.
|
| - """
|
| - username = 'testuser'
|
| - hostname = 'testhost'
|
| - servername = 'testserver'
|
| - self.protocol.realname = 'testname'
|
| - self.protocol.password = None
|
| - self.protocol.register(username, hostname, servername)
|
| - expected = [
|
| - 'NICK %s' % (username,),
|
| - 'USER %s %s %s :%s' % (
|
| - username, hostname, servername, self.protocol.realname),
|
| - '']
|
| - self.assertEqual(self.transport.getvalue().split('\r\n'), expected)
|
| -
|
| -
|
| - def test_registerWithPassword(self):
|
| - """
|
| - Verify that if the C{password} attribute of L{IRCClient} is not
|
| - C{None}, the C{register} method also authenticates using it.
|
| - """
|
| - username = 'testuser'
|
| - hostname = 'testhost'
|
| - servername = 'testserver'
|
| - self.protocol.realname = 'testname'
|
| - self.protocol.password = 'testpass'
|
| - self.protocol.register(username, hostname, servername)
|
| - expected = [
|
| - 'PASS %s' % (self.protocol.password,),
|
| - 'NICK %s' % (username,),
|
| - 'USER %s %s %s :%s' % (
|
| - username, hostname, servername, self.protocol.realname),
|
| - '']
|
| - self.assertEqual(self.transport.getvalue().split('\r\n'), expected)
|
|
|