OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2005 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 from StringIO import StringIO | |
5 import time | |
6 | |
7 from twisted.trial import unittest | |
8 from twisted.trial.unittest import TestCase | |
9 from twisted.words.protocols import irc | |
10 from twisted.words.protocols.irc import IRCClient | |
11 from twisted.internet import protocol | |
12 | |
13 | |
14 class StringIOWithoutClosing(StringIO): | |
15 def close(self): | |
16 pass | |
17 | |
18 stringSubjects = [ | |
19 "Hello, this is a nice string with no complications.", | |
20 "xargs%(NUL)smight%(NUL)slike%(NUL)sthis" % {'NUL': irc.NUL }, | |
21 "embedded%(CR)snewline%(CR)s%(NL)sFUN%(NL)s" % {'CR': irc.CR, | |
22 'NL': irc.NL}, | |
23 "escape!%(X)s escape!%(M)s %(X)s%(X)sa %(M)s0" % {'X': irc.X_QUOTE, | |
24 'M': irc.M_QUOTE} | |
25 ] | |
26 | |
27 | |
28 class QuotingTest(unittest.TestCase): | |
29 def test_lowquoteSanity(self): | |
30 """Testing client-server level quote/dequote""" | |
31 for s in stringSubjects: | |
32 self.failUnlessEqual(s, irc.lowDequote(irc.lowQuote(s))) | |
33 | |
34 def test_ctcpquoteSanity(self): | |
35 """Testing CTCP message level quote/dequote""" | |
36 for s in stringSubjects: | |
37 self.failUnlessEqual(s, irc.ctcpDequote(irc.ctcpQuote(s))) | |
38 | |
39 | |
40 class IRCClientWithoutLogin(irc.IRCClient): | |
41 performLogin = 0 | |
42 | |
43 | |
44 class CTCPTest(unittest.TestCase): | |
45 def setUp(self): | |
46 self.file = StringIOWithoutClosing() | |
47 self.transport = protocol.FileWrapper(self.file) | |
48 self.client = IRCClientWithoutLogin() | |
49 self.client.makeConnection(self.transport) | |
50 | |
51 def test_ERRMSG(self): | |
52 """Testing CTCP query ERRMSG. | |
53 | |
54 Not because this is this is an especially important case in the | |
55 field, but it does go through the entire dispatch/decode/encode | |
56 process. | |
57 """ | |
58 | |
59 errQuery = (":nick!guy@over.there PRIVMSG #theChan :" | |
60 "%(X)cERRMSG t%(X)c%(EOL)s" | |
61 % {'X': irc.X_DELIM, | |
62 'EOL': irc.CR + irc.LF}) | |
63 | |
64 errReply = ("NOTICE nick :%(X)cERRMSG t :" | |
65 "No error has occoured.%(X)c%(EOL)s" | |
66 % {'X': irc.X_DELIM, | |
67 'EOL': irc.CR + irc.LF}) | |
68 | |
69 self.client.dataReceived(errQuery) | |
70 reply = self.file.getvalue() | |
71 | |
72 self.failUnlessEqual(errReply, reply) | |
73 | |
74 def tearDown(self): | |
75 self.transport.loseConnection() | |
76 self.client.connectionLost() | |
77 del self.client | |
78 del self.transport | |
79 | |
80 class NoticingClient(object, IRCClientWithoutLogin): | |
81 methods = { | |
82 'created': ('when',), | |
83 'yourHost': ('info',), | |
84 'myInfo': ('servername', 'version', 'umodes', 'cmodes'), | |
85 'luserClient': ('info',), | |
86 'bounce': ('info',), | |
87 'isupport': ('options',), | |
88 'luserChannels': ('channels',), | |
89 'luserOp': ('ops',), | |
90 'luserMe': ('info',), | |
91 'receivedMOTD': ('motd',), | |
92 | |
93 'privmsg': ('user', 'channel', 'message'), | |
94 'joined': ('channel',), | |
95 'left': ('channel',), | |
96 'noticed': ('user', 'channel', 'message'), | |
97 'modeChanged': ('user', 'channel', 'set', 'modes', 'args'), | |
98 'pong': ('user', 'secs'), | |
99 'signedOn': (), | |
100 'kickedFrom': ('channel', 'kicker', 'message'), | |
101 'nickChanged': ('nick',), | |
102 | |
103 'userJoined': ('user', 'channel'), | |
104 'userLeft': ('user', 'channel'), | |
105 'userKicked': ('user', 'channel', 'kicker', 'message'), | |
106 'action': ('user', 'channel', 'data'), | |
107 'topicUpdated': ('user', 'channel', 'newTopic'), | |
108 'userRenamed': ('oldname', 'newname')} | |
109 | |
110 def __init__(self, *a, **kw): | |
111 object.__init__(self) | |
112 self.calls = [] | |
113 | |
114 def __getattribute__(self, name): | |
115 if name.startswith('__') and name.endswith('__'): | |
116 return super(NoticingClient, self).__getattribute__(name) | |
117 try: | |
118 args = super(NoticingClient, self).__getattribute__('methods')[name] | |
119 except KeyError: | |
120 return super(NoticingClient, self).__getattribute__(name) | |
121 else: | |
122 return self.makeMethod(name, args) | |
123 | |
124 def makeMethod(self, fname, args): | |
125 def method(*a, **kw): | |
126 if len(a) > len(args): | |
127 raise TypeError("TypeError: %s() takes %d arguments " | |
128 "(%d given)" % (fname, len(args), len(a))) | |
129 for (name, value) in zip(args, a): | |
130 if name in kw: | |
131 raise TypeError("TypeError: %s() got multiple values " | |
132 "for keyword argument '%s'" % (fname, name)) | |
133 else: | |
134 kw[name] = value | |
135 if len(kw) != len(args): | |
136 raise TypeError("TypeError: %s() takes %d arguments " | |
137 "(%d given)" % (fname, len(args), len(a))) | |
138 self.calls.append((fname, kw)) | |
139 return method | |
140 | |
141 def pop(dict, key, default): | |
142 try: | |
143 value = dict[key] | |
144 except KeyError: | |
145 return default | |
146 else: | |
147 del dict[key] | |
148 return value | |
149 | |
150 class ModeTestCase(unittest.TestCase): | |
151 def setUp(self): | |
152 self.file = StringIOWithoutClosing() | |
153 self.transport = protocol.FileWrapper(self.file) | |
154 self.client = NoticingClient() | |
155 self.client.makeConnection(self.transport) | |
156 | |
157 def tearDown(self): | |
158 self.transport.loseConnection() | |
159 self.client.connectionLost() | |
160 del self.client | |
161 del self.transport | |
162 | |
163 def testModeChange(self): | |
164 message = ":ChanServ!ChanServ@services. MODE #tanstaafl +o exarkun\r\n" | |
165 self.client.dataReceived(message) | |
166 self.assertEquals( | |
167 self.client.calls, | |
168 [('modeChanged', {'user': "ChanServ!ChanServ@services.", | |
169 'channel': '#tanstaafl', | |
170 'set': True, | |
171 'modes': 'o', | |
172 'args': ('exarkun',)})]) | |
173 | |
174 def _serverTestImpl(self, code, msg, func, **kw): | |
175 host = pop(kw, 'host', 'server.host') | |
176 nick = pop(kw, 'nick', 'nickname') | |
177 args = pop(kw, 'args', '') | |
178 | |
179 message = (":" + | |
180 host + " " + | |
181 code + " " + | |
182 nick + " " + | |
183 args + " :" + | |
184 msg + "\r\n") | |
185 | |
186 self.client.dataReceived(message) | |
187 self.assertEquals( | |
188 self.client.calls, | |
189 [(func, kw)]) | |
190 | |
191 def testYourHost(self): | |
192 msg = "Your host is some.host[blah.blah/6667], running version server-ve
rsion-3" | |
193 self._serverTestImpl("002", msg, "yourHost", info=msg) | |
194 | |
195 def testCreated(self): | |
196 msg = "This server was cobbled together Fri Aug 13 18:00:25 UTC 2004" | |
197 self._serverTestImpl("003", msg, "created", when=msg) | |
198 | |
199 def testMyInfo(self): | |
200 msg = "server.host server-version abcDEF bcdEHI" | |
201 self._serverTestImpl("004", msg, "myInfo", | |
202 servername="server.host", | |
203 version="server-version", | |
204 umodes="abcDEF", | |
205 cmodes="bcdEHI") | |
206 | |
207 def testLuserClient(self): | |
208 msg = "There are 9227 victims and 9542 hiding on 24 servers" | |
209 self._serverTestImpl("251", msg, "luserClient", | |
210 info=msg) | |
211 | |
212 def testISupport(self): | |
213 args = ("MODES=4 CHANLIMIT=#:20 NICKLEN=16 USERLEN=10 HOSTLEN=63 " | |
214 "TOPICLEN=450 KICKLEN=450 CHANNELLEN=30 KEYLEN=23 CHANTYPES=# " | |
215 "PREFIX=(ov)@+ CASEMAPPING=ascii CAPAB IRCD=dancer") | |
216 msg = "are available on this server" | |
217 self._serverTestImpl("005", msg, "isupport", args=args, | |
218 options=['MODES=4', | |
219 'CHANLIMIT=#:20', | |
220 'NICKLEN=16', | |
221 'USERLEN=10', | |
222 'HOSTLEN=63', | |
223 'TOPICLEN=450', | |
224 'KICKLEN=450', | |
225 'CHANNELLEN=30', | |
226 'KEYLEN=23', | |
227 'CHANTYPES=#', | |
228 'PREFIX=(ov)@+', | |
229 'CASEMAPPING=ascii', | |
230 'CAPAB', | |
231 'IRCD=dancer']) | |
232 | |
233 def testBounce(self): | |
234 msg = "Try server some.host, port 321" | |
235 self._serverTestImpl("005", msg, "bounce", | |
236 info=msg) | |
237 | |
238 def testLuserChannels(self): | |
239 args = "7116" | |
240 msg = "channels formed" | |
241 self._serverTestImpl("254", msg, "luserChannels", args=args, | |
242 channels=int(args)) | |
243 | |
244 def testLuserOp(self): | |
245 args = "34" | |
246 msg = "flagged staff members" | |
247 self._serverTestImpl("252", msg, "luserOp", args=args, | |
248 ops=int(args)) | |
249 | |
250 def testLuserMe(self): | |
251 msg = "I have 1937 clients and 0 servers" | |
252 self._serverTestImpl("255", msg, "luserMe", | |
253 info=msg) | |
254 | |
255 def testMOTD(self): | |
256 lines = [ | |
257 ":host.name 375 nickname :- host.name Message of the Day -", | |
258 ":host.name 372 nickname :- Welcome to host.name", | |
259 ":host.name 376 nickname :End of /MOTD command."] | |
260 for L in lines: | |
261 self.assertEquals(self.client.calls, []) | |
262 self.client.dataReceived(L + '\r\n') | |
263 | |
264 self.assertEquals( | |
265 self.client.calls, | |
266 [("receivedMOTD", {"motd": ["host.name Message of the Day -", "Welco
me to host.name"]})]) | |
267 | |
268 | |
269 def _clientTestImpl(self, sender, group, type, msg, func, **kw): | |
270 ident = pop(kw, 'ident', 'ident') | |
271 host = pop(kw, 'host', 'host') | |
272 | |
273 wholeUser = sender + '!' + ident + '@' + host | |
274 message = (":" + | |
275 wholeUser + " " + | |
276 type + " " + | |
277 group + " :" + | |
278 msg + "\r\n") | |
279 self.client.dataReceived(message) | |
280 self.assertEquals( | |
281 self.client.calls, | |
282 [(func, kw)]) | |
283 self.client.calls = [] | |
284 | |
285 def testPrivmsg(self): | |
286 msg = "Tooty toot toot." | |
287 self._clientTestImpl("sender", "#group", "PRIVMSG", msg, "privmsg", | |
288 ident="ident", host="host", | |
289 # Expected results below | |
290 user="sender!ident@host", | |
291 channel="#group", | |
292 message=msg) | |
293 | |
294 self._clientTestImpl("sender", "recipient", "PRIVMSG", msg, "privmsg", | |
295 ident="ident", host="host", | |
296 # Expected results below | |
297 user="sender!ident@host", | |
298 channel="recipient", | |
299 message=msg) | |
300 | |
301 class BasicServerFunctionalityTestCase(unittest.TestCase): | |
302 def setUp(self): | |
303 self.f = StringIOWithoutClosing() | |
304 self.t = protocol.FileWrapper(self.f) | |
305 self.p = irc.IRC() | |
306 self.p.makeConnection(self.t) | |
307 | |
308 def check(self, s): | |
309 self.assertEquals(self.f.getvalue(), s) | |
310 | |
311 def testPrivmsg(self): | |
312 self.p.privmsg("this-is-sender", "this-is-recip", "this is message") | |
313 self.check(":this-is-sender PRIVMSG this-is-recip :this is message\r\n") | |
314 | |
315 def testNotice(self): | |
316 self.p.notice("this-is-sender", "this-is-recip", "this is notice") | |
317 self.check(":this-is-sender NOTICE this-is-recip :this is notice\r\n") | |
318 | |
319 def testAction(self): | |
320 self.p.action("this-is-sender", "this-is-recip", "this is action") | |
321 self.check(":this-is-sender ACTION this-is-recip :this is action\r\n") | |
322 | |
323 def testJoin(self): | |
324 self.p.join("this-person", "#this-channel") | |
325 self.check(":this-person JOIN #this-channel\r\n") | |
326 | |
327 def testPart(self): | |
328 self.p.part("this-person", "#that-channel") | |
329 self.check(":this-person PART #that-channel\r\n") | |
330 | |
331 def testWhois(self): | |
332 """ | |
333 Verify that a whois by the client receives the right protocol actions | |
334 from the server. | |
335 """ | |
336 timestamp = int(time.time()-100) | |
337 hostname = self.p.hostname | |
338 req = 'requesting-nick' | |
339 targ = 'target-nick' | |
340 self.p.whois(req, targ, 'target', 'host.com', | |
341 'Target User', 'irc.host.com', 'A fake server', False, | |
342 12, timestamp, ['#fakeusers', '#fakemisc']) | |
343 expected = '\r\n'.join([ | |
344 ':%(hostname)s 311 %(req)s %(targ)s target host.com * :Target User', | |
345 ':%(hostname)s 312 %(req)s %(targ)s irc.host.com :A fake server', | |
346 ':%(hostname)s 317 %(req)s %(targ)s 12 %(timestamp)s :seconds idle, signon time'
, | |
347 ':%(hostname)s 319 %(req)s %(targ)s :#fakeusers #fakemisc', | |
348 ':%(hostname)s 318 %(req)s %(targ)s :End of WHOIS list.', | |
349 '']) % dict(hostname=hostname, timestamp=timestamp, req=req, targ=targ) | |
350 self.check(expected) | |
351 | |
352 | |
353 class DummyClient(irc.IRCClient): | |
354 def __init__(self): | |
355 self.lines = [] | |
356 def sendLine(self, m): | |
357 self.lines.append(m) | |
358 | |
359 | |
360 class ClientMsgTests(unittest.TestCase): | |
361 def setUp(self): | |
362 self.client = DummyClient() | |
363 | |
364 def testSingleLine(self): | |
365 self.client.msg('foo', 'bar') | |
366 self.assertEquals(self.client.lines, ['PRIVMSG foo :bar']) | |
367 | |
368 def testDodgyMaxLength(self): | |
369 self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 0) | |
370 self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 3) | |
371 | |
372 def testMultipleLine(self): | |
373 maxLen = len('PRIVMSG foo :') + 3 + 2 # 2 for line endings | |
374 self.client.msg('foo', 'barbazbo', maxLen) | |
375 self.assertEquals(self.client.lines, ['PRIVMSG foo :bar', | |
376 'PRIVMSG foo :baz', | |
377 'PRIVMSG foo :bo']) | |
378 | |
379 def testSufficientWidth(self): | |
380 msg = 'barbazbo' | |
381 maxLen = len('PRIVMSG foo :%s' % (msg,)) + 2 | |
382 self.client.msg('foo', msg, maxLen) | |
383 self.assertEquals(self.client.lines, ['PRIVMSG foo :%s' % (msg,)]) | |
384 self.client.lines = [] | |
385 self.client.msg('foo', msg, maxLen-1) | |
386 self.assertEquals(2, len(self.client.lines)) | |
387 self.client.lines = [] | |
388 self.client.msg('foo', msg, maxLen+1) | |
389 self.assertEquals(1, len(self.client.lines)) | |
390 | |
391 def testSplitSanity(self): | |
392 # Whiteboxing | |
393 self.assertRaises(ValueError, irc.split, 'foo', -1) | |
394 self.assertRaises(ValueError, irc.split, 'foo', 0) | |
395 self.assertEquals([], irc.split('', 1)) | |
396 self.assertEquals([], irc.split('')) | |
397 | |
398 | |
399 class ClientTests(TestCase): | |
400 """ | |
401 Tests for the protocol-level behavior of IRCClient methods intended to | |
402 be called by application code. | |
403 """ | |
404 def setUp(self): | |
405 self.transport = StringIO() | |
406 self.protocol = IRCClient() | |
407 self.protocol.performLogin = False | |
408 self.protocol.makeConnection(self.transport) | |
409 | |
410 # Sanity check - we don't want anything to have happened at this | |
411 # point, since we're not in a test yet. | |
412 self.failIf(self.transport.getvalue()) | |
413 | |
414 | |
415 def test_register(self): | |
416 """ | |
417 Verify that the L{IRCClient.register} method sends a a USER command | |
418 with the correct arguments. | |
419 """ | |
420 username = 'testuser' | |
421 hostname = 'testhost' | |
422 servername = 'testserver' | |
423 self.protocol.realname = 'testname' | |
424 self.protocol.password = None | |
425 self.protocol.register(username, hostname, servername) | |
426 expected = [ | |
427 'NICK %s' % (username,), | |
428 'USER %s %s %s :%s' % ( | |
429 username, hostname, servername, self.protocol.realname), | |
430 ''] | |
431 self.assertEqual(self.transport.getvalue().split('\r\n'), expected) | |
432 | |
433 | |
434 def test_registerWithPassword(self): | |
435 """ | |
436 Verify that if the C{password} attribute of L{IRCClient} is not | |
437 C{None}, the C{register} method also authenticates using it. | |
438 """ | |
439 username = 'testuser' | |
440 hostname = 'testhost' | |
441 servername = 'testserver' | |
442 self.protocol.realname = 'testname' | |
443 self.protocol.password = 'testpass' | |
444 self.protocol.register(username, hostname, servername) | |
445 expected = [ | |
446 'PASS %s' % (self.protocol.password,), | |
447 'NICK %s' % (username,), | |
448 'USER %s %s %s :%s' % ( | |
449 username, hostname, servername, self.protocol.realname), | |
450 ''] | |
451 self.assertEqual(self.transport.getvalue().split('\r\n'), expected) | |
OLD | NEW |