| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2005 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 from StringIO import StringIO | |
| 5 import time | |
| 6 | |
| 7 from twisted.trial import unittest | |
| 8 from twisted.trial.unittest import TestCase | |
| 9 from twisted.words.protocols import irc | |
| 10 from twisted.words.protocols.irc import IRCClient | |
| 11 from twisted.internet import protocol | |
| 12 | |
| 13 | |
| 14 class StringIOWithoutClosing(StringIO): | |
| 15 def close(self): | |
| 16 pass | |
| 17 | |
| 18 stringSubjects = [ | |
| 19 "Hello, this is a nice string with no complications.", | |
| 20 "xargs%(NUL)smight%(NUL)slike%(NUL)sthis" % {'NUL': irc.NUL }, | |
| 21 "embedded%(CR)snewline%(CR)s%(NL)sFUN%(NL)s" % {'CR': irc.CR, | |
| 22 'NL': irc.NL}, | |
| 23 "escape!%(X)s escape!%(M)s %(X)s%(X)sa %(M)s0" % {'X': irc.X_QUOTE, | |
| 24 'M': irc.M_QUOTE} | |
| 25 ] | |
| 26 | |
| 27 | |
| 28 class QuotingTest(unittest.TestCase): | |
| 29 def test_lowquoteSanity(self): | |
| 30 """Testing client-server level quote/dequote""" | |
| 31 for s in stringSubjects: | |
| 32 self.failUnlessEqual(s, irc.lowDequote(irc.lowQuote(s))) | |
| 33 | |
| 34 def test_ctcpquoteSanity(self): | |
| 35 """Testing CTCP message level quote/dequote""" | |
| 36 for s in stringSubjects: | |
| 37 self.failUnlessEqual(s, irc.ctcpDequote(irc.ctcpQuote(s))) | |
| 38 | |
| 39 | |
| 40 class IRCClientWithoutLogin(irc.IRCClient): | |
| 41 performLogin = 0 | |
| 42 | |
| 43 | |
| 44 class CTCPTest(unittest.TestCase): | |
| 45 def setUp(self): | |
| 46 self.file = StringIOWithoutClosing() | |
| 47 self.transport = protocol.FileWrapper(self.file) | |
| 48 self.client = IRCClientWithoutLogin() | |
| 49 self.client.makeConnection(self.transport) | |
| 50 | |
| 51 def test_ERRMSG(self): | |
| 52 """Testing CTCP query ERRMSG. | |
| 53 | |
| 54 Not because this is this is an especially important case in the | |
| 55 field, but it does go through the entire dispatch/decode/encode | |
| 56 process. | |
| 57 """ | |
| 58 | |
| 59 errQuery = (":nick!guy@over.there PRIVMSG #theChan :" | |
| 60 "%(X)cERRMSG t%(X)c%(EOL)s" | |
| 61 % {'X': irc.X_DELIM, | |
| 62 'EOL': irc.CR + irc.LF}) | |
| 63 | |
| 64 errReply = ("NOTICE nick :%(X)cERRMSG t :" | |
| 65 "No error has occoured.%(X)c%(EOL)s" | |
| 66 % {'X': irc.X_DELIM, | |
| 67 'EOL': irc.CR + irc.LF}) | |
| 68 | |
| 69 self.client.dataReceived(errQuery) | |
| 70 reply = self.file.getvalue() | |
| 71 | |
| 72 self.failUnlessEqual(errReply, reply) | |
| 73 | |
| 74 def tearDown(self): | |
| 75 self.transport.loseConnection() | |
| 76 self.client.connectionLost() | |
| 77 del self.client | |
| 78 del self.transport | |
| 79 | |
| 80 class NoticingClient(object, IRCClientWithoutLogin): | |
| 81 methods = { | |
| 82 'created': ('when',), | |
| 83 'yourHost': ('info',), | |
| 84 'myInfo': ('servername', 'version', 'umodes', 'cmodes'), | |
| 85 'luserClient': ('info',), | |
| 86 'bounce': ('info',), | |
| 87 'isupport': ('options',), | |
| 88 'luserChannels': ('channels',), | |
| 89 'luserOp': ('ops',), | |
| 90 'luserMe': ('info',), | |
| 91 'receivedMOTD': ('motd',), | |
| 92 | |
| 93 'privmsg': ('user', 'channel', 'message'), | |
| 94 'joined': ('channel',), | |
| 95 'left': ('channel',), | |
| 96 'noticed': ('user', 'channel', 'message'), | |
| 97 'modeChanged': ('user', 'channel', 'set', 'modes', 'args'), | |
| 98 'pong': ('user', 'secs'), | |
| 99 'signedOn': (), | |
| 100 'kickedFrom': ('channel', 'kicker', 'message'), | |
| 101 'nickChanged': ('nick',), | |
| 102 | |
| 103 'userJoined': ('user', 'channel'), | |
| 104 'userLeft': ('user', 'channel'), | |
| 105 'userKicked': ('user', 'channel', 'kicker', 'message'), | |
| 106 'action': ('user', 'channel', 'data'), | |
| 107 'topicUpdated': ('user', 'channel', 'newTopic'), | |
| 108 'userRenamed': ('oldname', 'newname')} | |
| 109 | |
| 110 def __init__(self, *a, **kw): | |
| 111 object.__init__(self) | |
| 112 self.calls = [] | |
| 113 | |
| 114 def __getattribute__(self, name): | |
| 115 if name.startswith('__') and name.endswith('__'): | |
| 116 return super(NoticingClient, self).__getattribute__(name) | |
| 117 try: | |
| 118 args = super(NoticingClient, self).__getattribute__('methods')[name] | |
| 119 except KeyError: | |
| 120 return super(NoticingClient, self).__getattribute__(name) | |
| 121 else: | |
| 122 return self.makeMethod(name, args) | |
| 123 | |
| 124 def makeMethod(self, fname, args): | |
| 125 def method(*a, **kw): | |
| 126 if len(a) > len(args): | |
| 127 raise TypeError("TypeError: %s() takes %d arguments " | |
| 128 "(%d given)" % (fname, len(args), len(a))) | |
| 129 for (name, value) in zip(args, a): | |
| 130 if name in kw: | |
| 131 raise TypeError("TypeError: %s() got multiple values " | |
| 132 "for keyword argument '%s'" % (fname, name)) | |
| 133 else: | |
| 134 kw[name] = value | |
| 135 if len(kw) != len(args): | |
| 136 raise TypeError("TypeError: %s() takes %d arguments " | |
| 137 "(%d given)" % (fname, len(args), len(a))) | |
| 138 self.calls.append((fname, kw)) | |
| 139 return method | |
| 140 | |
| 141 def pop(dict, key, default): | |
| 142 try: | |
| 143 value = dict[key] | |
| 144 except KeyError: | |
| 145 return default | |
| 146 else: | |
| 147 del dict[key] | |
| 148 return value | |
| 149 | |
| 150 class ModeTestCase(unittest.TestCase): | |
| 151 def setUp(self): | |
| 152 self.file = StringIOWithoutClosing() | |
| 153 self.transport = protocol.FileWrapper(self.file) | |
| 154 self.client = NoticingClient() | |
| 155 self.client.makeConnection(self.transport) | |
| 156 | |
| 157 def tearDown(self): | |
| 158 self.transport.loseConnection() | |
| 159 self.client.connectionLost() | |
| 160 del self.client | |
| 161 del self.transport | |
| 162 | |
| 163 def testModeChange(self): | |
| 164 message = ":ChanServ!ChanServ@services. MODE #tanstaafl +o exarkun\r\n" | |
| 165 self.client.dataReceived(message) | |
| 166 self.assertEquals( | |
| 167 self.client.calls, | |
| 168 [('modeChanged', {'user': "ChanServ!ChanServ@services.", | |
| 169 'channel': '#tanstaafl', | |
| 170 'set': True, | |
| 171 'modes': 'o', | |
| 172 'args': ('exarkun',)})]) | |
| 173 | |
| 174 def _serverTestImpl(self, code, msg, func, **kw): | |
| 175 host = pop(kw, 'host', 'server.host') | |
| 176 nick = pop(kw, 'nick', 'nickname') | |
| 177 args = pop(kw, 'args', '') | |
| 178 | |
| 179 message = (":" + | |
| 180 host + " " + | |
| 181 code + " " + | |
| 182 nick + " " + | |
| 183 args + " :" + | |
| 184 msg + "\r\n") | |
| 185 | |
| 186 self.client.dataReceived(message) | |
| 187 self.assertEquals( | |
| 188 self.client.calls, | |
| 189 [(func, kw)]) | |
| 190 | |
| 191 def testYourHost(self): | |
| 192 msg = "Your host is some.host[blah.blah/6667], running version server-ve
rsion-3" | |
| 193 self._serverTestImpl("002", msg, "yourHost", info=msg) | |
| 194 | |
| 195 def testCreated(self): | |
| 196 msg = "This server was cobbled together Fri Aug 13 18:00:25 UTC 2004" | |
| 197 self._serverTestImpl("003", msg, "created", when=msg) | |
| 198 | |
| 199 def testMyInfo(self): | |
| 200 msg = "server.host server-version abcDEF bcdEHI" | |
| 201 self._serverTestImpl("004", msg, "myInfo", | |
| 202 servername="server.host", | |
| 203 version="server-version", | |
| 204 umodes="abcDEF", | |
| 205 cmodes="bcdEHI") | |
| 206 | |
| 207 def testLuserClient(self): | |
| 208 msg = "There are 9227 victims and 9542 hiding on 24 servers" | |
| 209 self._serverTestImpl("251", msg, "luserClient", | |
| 210 info=msg) | |
| 211 | |
| 212 def testISupport(self): | |
| 213 args = ("MODES=4 CHANLIMIT=#:20 NICKLEN=16 USERLEN=10 HOSTLEN=63 " | |
| 214 "TOPICLEN=450 KICKLEN=450 CHANNELLEN=30 KEYLEN=23 CHANTYPES=# " | |
| 215 "PREFIX=(ov)@+ CASEMAPPING=ascii CAPAB IRCD=dancer") | |
| 216 msg = "are available on this server" | |
| 217 self._serverTestImpl("005", msg, "isupport", args=args, | |
| 218 options=['MODES=4', | |
| 219 'CHANLIMIT=#:20', | |
| 220 'NICKLEN=16', | |
| 221 'USERLEN=10', | |
| 222 'HOSTLEN=63', | |
| 223 'TOPICLEN=450', | |
| 224 'KICKLEN=450', | |
| 225 'CHANNELLEN=30', | |
| 226 'KEYLEN=23', | |
| 227 'CHANTYPES=#', | |
| 228 'PREFIX=(ov)@+', | |
| 229 'CASEMAPPING=ascii', | |
| 230 'CAPAB', | |
| 231 'IRCD=dancer']) | |
| 232 | |
| 233 def testBounce(self): | |
| 234 msg = "Try server some.host, port 321" | |
| 235 self._serverTestImpl("005", msg, "bounce", | |
| 236 info=msg) | |
| 237 | |
| 238 def testLuserChannels(self): | |
| 239 args = "7116" | |
| 240 msg = "channels formed" | |
| 241 self._serverTestImpl("254", msg, "luserChannels", args=args, | |
| 242 channels=int(args)) | |
| 243 | |
| 244 def testLuserOp(self): | |
| 245 args = "34" | |
| 246 msg = "flagged staff members" | |
| 247 self._serverTestImpl("252", msg, "luserOp", args=args, | |
| 248 ops=int(args)) | |
| 249 | |
| 250 def testLuserMe(self): | |
| 251 msg = "I have 1937 clients and 0 servers" | |
| 252 self._serverTestImpl("255", msg, "luserMe", | |
| 253 info=msg) | |
| 254 | |
| 255 def testMOTD(self): | |
| 256 lines = [ | |
| 257 ":host.name 375 nickname :- host.name Message of the Day -", | |
| 258 ":host.name 372 nickname :- Welcome to host.name", | |
| 259 ":host.name 376 nickname :End of /MOTD command."] | |
| 260 for L in lines: | |
| 261 self.assertEquals(self.client.calls, []) | |
| 262 self.client.dataReceived(L + '\r\n') | |
| 263 | |
| 264 self.assertEquals( | |
| 265 self.client.calls, | |
| 266 [("receivedMOTD", {"motd": ["host.name Message of the Day -", "Welco
me to host.name"]})]) | |
| 267 | |
| 268 | |
| 269 def _clientTestImpl(self, sender, group, type, msg, func, **kw): | |
| 270 ident = pop(kw, 'ident', 'ident') | |
| 271 host = pop(kw, 'host', 'host') | |
| 272 | |
| 273 wholeUser = sender + '!' + ident + '@' + host | |
| 274 message = (":" + | |
| 275 wholeUser + " " + | |
| 276 type + " " + | |
| 277 group + " :" + | |
| 278 msg + "\r\n") | |
| 279 self.client.dataReceived(message) | |
| 280 self.assertEquals( | |
| 281 self.client.calls, | |
| 282 [(func, kw)]) | |
| 283 self.client.calls = [] | |
| 284 | |
| 285 def testPrivmsg(self): | |
| 286 msg = "Tooty toot toot." | |
| 287 self._clientTestImpl("sender", "#group", "PRIVMSG", msg, "privmsg", | |
| 288 ident="ident", host="host", | |
| 289 # Expected results below | |
| 290 user="sender!ident@host", | |
| 291 channel="#group", | |
| 292 message=msg) | |
| 293 | |
| 294 self._clientTestImpl("sender", "recipient", "PRIVMSG", msg, "privmsg", | |
| 295 ident="ident", host="host", | |
| 296 # Expected results below | |
| 297 user="sender!ident@host", | |
| 298 channel="recipient", | |
| 299 message=msg) | |
| 300 | |
| 301 class BasicServerFunctionalityTestCase(unittest.TestCase): | |
| 302 def setUp(self): | |
| 303 self.f = StringIOWithoutClosing() | |
| 304 self.t = protocol.FileWrapper(self.f) | |
| 305 self.p = irc.IRC() | |
| 306 self.p.makeConnection(self.t) | |
| 307 | |
| 308 def check(self, s): | |
| 309 self.assertEquals(self.f.getvalue(), s) | |
| 310 | |
| 311 def testPrivmsg(self): | |
| 312 self.p.privmsg("this-is-sender", "this-is-recip", "this is message") | |
| 313 self.check(":this-is-sender PRIVMSG this-is-recip :this is message\r\n") | |
| 314 | |
| 315 def testNotice(self): | |
| 316 self.p.notice("this-is-sender", "this-is-recip", "this is notice") | |
| 317 self.check(":this-is-sender NOTICE this-is-recip :this is notice\r\n") | |
| 318 | |
| 319 def testAction(self): | |
| 320 self.p.action("this-is-sender", "this-is-recip", "this is action") | |
| 321 self.check(":this-is-sender ACTION this-is-recip :this is action\r\n") | |
| 322 | |
| 323 def testJoin(self): | |
| 324 self.p.join("this-person", "#this-channel") | |
| 325 self.check(":this-person JOIN #this-channel\r\n") | |
| 326 | |
| 327 def testPart(self): | |
| 328 self.p.part("this-person", "#that-channel") | |
| 329 self.check(":this-person PART #that-channel\r\n") | |
| 330 | |
| 331 def testWhois(self): | |
| 332 """ | |
| 333 Verify that a whois by the client receives the right protocol actions | |
| 334 from the server. | |
| 335 """ | |
| 336 timestamp = int(time.time()-100) | |
| 337 hostname = self.p.hostname | |
| 338 req = 'requesting-nick' | |
| 339 targ = 'target-nick' | |
| 340 self.p.whois(req, targ, 'target', 'host.com', | |
| 341 'Target User', 'irc.host.com', 'A fake server', False, | |
| 342 12, timestamp, ['#fakeusers', '#fakemisc']) | |
| 343 expected = '\r\n'.join([ | |
| 344 ':%(hostname)s 311 %(req)s %(targ)s target host.com * :Target User', | |
| 345 ':%(hostname)s 312 %(req)s %(targ)s irc.host.com :A fake server', | |
| 346 ':%(hostname)s 317 %(req)s %(targ)s 12 %(timestamp)s :seconds idle, signon time'
, | |
| 347 ':%(hostname)s 319 %(req)s %(targ)s :#fakeusers #fakemisc', | |
| 348 ':%(hostname)s 318 %(req)s %(targ)s :End of WHOIS list.', | |
| 349 '']) % dict(hostname=hostname, timestamp=timestamp, req=req, targ=targ) | |
| 350 self.check(expected) | |
| 351 | |
| 352 | |
| 353 class DummyClient(irc.IRCClient): | |
| 354 def __init__(self): | |
| 355 self.lines = [] | |
| 356 def sendLine(self, m): | |
| 357 self.lines.append(m) | |
| 358 | |
| 359 | |
| 360 class ClientMsgTests(unittest.TestCase): | |
| 361 def setUp(self): | |
| 362 self.client = DummyClient() | |
| 363 | |
| 364 def testSingleLine(self): | |
| 365 self.client.msg('foo', 'bar') | |
| 366 self.assertEquals(self.client.lines, ['PRIVMSG foo :bar']) | |
| 367 | |
| 368 def testDodgyMaxLength(self): | |
| 369 self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 0) | |
| 370 self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 3) | |
| 371 | |
| 372 def testMultipleLine(self): | |
| 373 maxLen = len('PRIVMSG foo :') + 3 + 2 # 2 for line endings | |
| 374 self.client.msg('foo', 'barbazbo', maxLen) | |
| 375 self.assertEquals(self.client.lines, ['PRIVMSG foo :bar', | |
| 376 'PRIVMSG foo :baz', | |
| 377 'PRIVMSG foo :bo']) | |
| 378 | |
| 379 def testSufficientWidth(self): | |
| 380 msg = 'barbazbo' | |
| 381 maxLen = len('PRIVMSG foo :%s' % (msg,)) + 2 | |
| 382 self.client.msg('foo', msg, maxLen) | |
| 383 self.assertEquals(self.client.lines, ['PRIVMSG foo :%s' % (msg,)]) | |
| 384 self.client.lines = [] | |
| 385 self.client.msg('foo', msg, maxLen-1) | |
| 386 self.assertEquals(2, len(self.client.lines)) | |
| 387 self.client.lines = [] | |
| 388 self.client.msg('foo', msg, maxLen+1) | |
| 389 self.assertEquals(1, len(self.client.lines)) | |
| 390 | |
| 391 def testSplitSanity(self): | |
| 392 # Whiteboxing | |
| 393 self.assertRaises(ValueError, irc.split, 'foo', -1) | |
| 394 self.assertRaises(ValueError, irc.split, 'foo', 0) | |
| 395 self.assertEquals([], irc.split('', 1)) | |
| 396 self.assertEquals([], irc.split('')) | |
| 397 | |
| 398 | |
| 399 class ClientTests(TestCase): | |
| 400 """ | |
| 401 Tests for the protocol-level behavior of IRCClient methods intended to | |
| 402 be called by application code. | |
| 403 """ | |
| 404 def setUp(self): | |
| 405 self.transport = StringIO() | |
| 406 self.protocol = IRCClient() | |
| 407 self.protocol.performLogin = False | |
| 408 self.protocol.makeConnection(self.transport) | |
| 409 | |
| 410 # Sanity check - we don't want anything to have happened at this | |
| 411 # point, since we're not in a test yet. | |
| 412 self.failIf(self.transport.getvalue()) | |
| 413 | |
| 414 | |
| 415 def test_register(self): | |
| 416 """ | |
| 417 Verify that the L{IRCClient.register} method sends a a USER command | |
| 418 with the correct arguments. | |
| 419 """ | |
| 420 username = 'testuser' | |
| 421 hostname = 'testhost' | |
| 422 servername = 'testserver' | |
| 423 self.protocol.realname = 'testname' | |
| 424 self.protocol.password = None | |
| 425 self.protocol.register(username, hostname, servername) | |
| 426 expected = [ | |
| 427 'NICK %s' % (username,), | |
| 428 'USER %s %s %s :%s' % ( | |
| 429 username, hostname, servername, self.protocol.realname), | |
| 430 ''] | |
| 431 self.assertEqual(self.transport.getvalue().split('\r\n'), expected) | |
| 432 | |
| 433 | |
| 434 def test_registerWithPassword(self): | |
| 435 """ | |
| 436 Verify that if the C{password} attribute of L{IRCClient} is not | |
| 437 C{None}, the C{register} method also authenticates using it. | |
| 438 """ | |
| 439 username = 'testuser' | |
| 440 hostname = 'testhost' | |
| 441 servername = 'testserver' | |
| 442 self.protocol.realname = 'testname' | |
| 443 self.protocol.password = 'testpass' | |
| 444 self.protocol.register(username, hostname, servername) | |
| 445 expected = [ | |
| 446 'PASS %s' % (self.protocol.password,), | |
| 447 'NICK %s' % (username,), | |
| 448 'USER %s %s %s :%s' % ( | |
| 449 username, hostname, servername, self.protocol.realname), | |
| 450 ''] | |
| 451 self.assertEqual(self.transport.getvalue().split('\r\n'), expected) | |
| OLD | NEW |