Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(61)

Side by Side Diff: third_party/twisted_8_1/twisted/words/test/test_irc.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2005 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 from StringIO import StringIO
5 import time
6
7 from twisted.trial import unittest
8 from twisted.trial.unittest import TestCase
9 from twisted.words.protocols import irc
10 from twisted.words.protocols.irc import IRCClient
11 from twisted.internet import protocol
12
13
14 class StringIOWithoutClosing(StringIO):
15 def close(self):
16 pass
17
18 stringSubjects = [
19 "Hello, this is a nice string with no complications.",
20 "xargs%(NUL)smight%(NUL)slike%(NUL)sthis" % {'NUL': irc.NUL },
21 "embedded%(CR)snewline%(CR)s%(NL)sFUN%(NL)s" % {'CR': irc.CR,
22 'NL': irc.NL},
23 "escape!%(X)s escape!%(M)s %(X)s%(X)sa %(M)s0" % {'X': irc.X_QUOTE,
24 'M': irc.M_QUOTE}
25 ]
26
27
28 class QuotingTest(unittest.TestCase):
29 def test_lowquoteSanity(self):
30 """Testing client-server level quote/dequote"""
31 for s in stringSubjects:
32 self.failUnlessEqual(s, irc.lowDequote(irc.lowQuote(s)))
33
34 def test_ctcpquoteSanity(self):
35 """Testing CTCP message level quote/dequote"""
36 for s in stringSubjects:
37 self.failUnlessEqual(s, irc.ctcpDequote(irc.ctcpQuote(s)))
38
39
40 class IRCClientWithoutLogin(irc.IRCClient):
41 performLogin = 0
42
43
44 class CTCPTest(unittest.TestCase):
45 def setUp(self):
46 self.file = StringIOWithoutClosing()
47 self.transport = protocol.FileWrapper(self.file)
48 self.client = IRCClientWithoutLogin()
49 self.client.makeConnection(self.transport)
50
51 def test_ERRMSG(self):
52 """Testing CTCP query ERRMSG.
53
54 Not because this is this is an especially important case in the
55 field, but it does go through the entire dispatch/decode/encode
56 process.
57 """
58
59 errQuery = (":nick!guy@over.there PRIVMSG #theChan :"
60 "%(X)cERRMSG t%(X)c%(EOL)s"
61 % {'X': irc.X_DELIM,
62 'EOL': irc.CR + irc.LF})
63
64 errReply = ("NOTICE nick :%(X)cERRMSG t :"
65 "No error has occoured.%(X)c%(EOL)s"
66 % {'X': irc.X_DELIM,
67 'EOL': irc.CR + irc.LF})
68
69 self.client.dataReceived(errQuery)
70 reply = self.file.getvalue()
71
72 self.failUnlessEqual(errReply, reply)
73
74 def tearDown(self):
75 self.transport.loseConnection()
76 self.client.connectionLost()
77 del self.client
78 del self.transport
79
80 class NoticingClient(object, IRCClientWithoutLogin):
81 methods = {
82 'created': ('when',),
83 'yourHost': ('info',),
84 'myInfo': ('servername', 'version', 'umodes', 'cmodes'),
85 'luserClient': ('info',),
86 'bounce': ('info',),
87 'isupport': ('options',),
88 'luserChannels': ('channels',),
89 'luserOp': ('ops',),
90 'luserMe': ('info',),
91 'receivedMOTD': ('motd',),
92
93 'privmsg': ('user', 'channel', 'message'),
94 'joined': ('channel',),
95 'left': ('channel',),
96 'noticed': ('user', 'channel', 'message'),
97 'modeChanged': ('user', 'channel', 'set', 'modes', 'args'),
98 'pong': ('user', 'secs'),
99 'signedOn': (),
100 'kickedFrom': ('channel', 'kicker', 'message'),
101 'nickChanged': ('nick',),
102
103 'userJoined': ('user', 'channel'),
104 'userLeft': ('user', 'channel'),
105 'userKicked': ('user', 'channel', 'kicker', 'message'),
106 'action': ('user', 'channel', 'data'),
107 'topicUpdated': ('user', 'channel', 'newTopic'),
108 'userRenamed': ('oldname', 'newname')}
109
110 def __init__(self, *a, **kw):
111 object.__init__(self)
112 self.calls = []
113
114 def __getattribute__(self, name):
115 if name.startswith('__') and name.endswith('__'):
116 return super(NoticingClient, self).__getattribute__(name)
117 try:
118 args = super(NoticingClient, self).__getattribute__('methods')[name]
119 except KeyError:
120 return super(NoticingClient, self).__getattribute__(name)
121 else:
122 return self.makeMethod(name, args)
123
124 def makeMethod(self, fname, args):
125 def method(*a, **kw):
126 if len(a) > len(args):
127 raise TypeError("TypeError: %s() takes %d arguments "
128 "(%d given)" % (fname, len(args), len(a)))
129 for (name, value) in zip(args, a):
130 if name in kw:
131 raise TypeError("TypeError: %s() got multiple values "
132 "for keyword argument '%s'" % (fname, name))
133 else:
134 kw[name] = value
135 if len(kw) != len(args):
136 raise TypeError("TypeError: %s() takes %d arguments "
137 "(%d given)" % (fname, len(args), len(a)))
138 self.calls.append((fname, kw))
139 return method
140
141 def pop(dict, key, default):
142 try:
143 value = dict[key]
144 except KeyError:
145 return default
146 else:
147 del dict[key]
148 return value
149
150 class ModeTestCase(unittest.TestCase):
151 def setUp(self):
152 self.file = StringIOWithoutClosing()
153 self.transport = protocol.FileWrapper(self.file)
154 self.client = NoticingClient()
155 self.client.makeConnection(self.transport)
156
157 def tearDown(self):
158 self.transport.loseConnection()
159 self.client.connectionLost()
160 del self.client
161 del self.transport
162
163 def testModeChange(self):
164 message = ":ChanServ!ChanServ@services. MODE #tanstaafl +o exarkun\r\n"
165 self.client.dataReceived(message)
166 self.assertEquals(
167 self.client.calls,
168 [('modeChanged', {'user': "ChanServ!ChanServ@services.",
169 'channel': '#tanstaafl',
170 'set': True,
171 'modes': 'o',
172 'args': ('exarkun',)})])
173
174 def _serverTestImpl(self, code, msg, func, **kw):
175 host = pop(kw, 'host', 'server.host')
176 nick = pop(kw, 'nick', 'nickname')
177 args = pop(kw, 'args', '')
178
179 message = (":" +
180 host + " " +
181 code + " " +
182 nick + " " +
183 args + " :" +
184 msg + "\r\n")
185
186 self.client.dataReceived(message)
187 self.assertEquals(
188 self.client.calls,
189 [(func, kw)])
190
191 def testYourHost(self):
192 msg = "Your host is some.host[blah.blah/6667], running version server-ve rsion-3"
193 self._serverTestImpl("002", msg, "yourHost", info=msg)
194
195 def testCreated(self):
196 msg = "This server was cobbled together Fri Aug 13 18:00:25 UTC 2004"
197 self._serverTestImpl("003", msg, "created", when=msg)
198
199 def testMyInfo(self):
200 msg = "server.host server-version abcDEF bcdEHI"
201 self._serverTestImpl("004", msg, "myInfo",
202 servername="server.host",
203 version="server-version",
204 umodes="abcDEF",
205 cmodes="bcdEHI")
206
207 def testLuserClient(self):
208 msg = "There are 9227 victims and 9542 hiding on 24 servers"
209 self._serverTestImpl("251", msg, "luserClient",
210 info=msg)
211
212 def testISupport(self):
213 args = ("MODES=4 CHANLIMIT=#:20 NICKLEN=16 USERLEN=10 HOSTLEN=63 "
214 "TOPICLEN=450 KICKLEN=450 CHANNELLEN=30 KEYLEN=23 CHANTYPES=# "
215 "PREFIX=(ov)@+ CASEMAPPING=ascii CAPAB IRCD=dancer")
216 msg = "are available on this server"
217 self._serverTestImpl("005", msg, "isupport", args=args,
218 options=['MODES=4',
219 'CHANLIMIT=#:20',
220 'NICKLEN=16',
221 'USERLEN=10',
222 'HOSTLEN=63',
223 'TOPICLEN=450',
224 'KICKLEN=450',
225 'CHANNELLEN=30',
226 'KEYLEN=23',
227 'CHANTYPES=#',
228 'PREFIX=(ov)@+',
229 'CASEMAPPING=ascii',
230 'CAPAB',
231 'IRCD=dancer'])
232
233 def testBounce(self):
234 msg = "Try server some.host, port 321"
235 self._serverTestImpl("005", msg, "bounce",
236 info=msg)
237
238 def testLuserChannels(self):
239 args = "7116"
240 msg = "channels formed"
241 self._serverTestImpl("254", msg, "luserChannels", args=args,
242 channels=int(args))
243
244 def testLuserOp(self):
245 args = "34"
246 msg = "flagged staff members"
247 self._serverTestImpl("252", msg, "luserOp", args=args,
248 ops=int(args))
249
250 def testLuserMe(self):
251 msg = "I have 1937 clients and 0 servers"
252 self._serverTestImpl("255", msg, "luserMe",
253 info=msg)
254
255 def testMOTD(self):
256 lines = [
257 ":host.name 375 nickname :- host.name Message of the Day -",
258 ":host.name 372 nickname :- Welcome to host.name",
259 ":host.name 376 nickname :End of /MOTD command."]
260 for L in lines:
261 self.assertEquals(self.client.calls, [])
262 self.client.dataReceived(L + '\r\n')
263
264 self.assertEquals(
265 self.client.calls,
266 [("receivedMOTD", {"motd": ["host.name Message of the Day -", "Welco me to host.name"]})])
267
268
269 def _clientTestImpl(self, sender, group, type, msg, func, **kw):
270 ident = pop(kw, 'ident', 'ident')
271 host = pop(kw, 'host', 'host')
272
273 wholeUser = sender + '!' + ident + '@' + host
274 message = (":" +
275 wholeUser + " " +
276 type + " " +
277 group + " :" +
278 msg + "\r\n")
279 self.client.dataReceived(message)
280 self.assertEquals(
281 self.client.calls,
282 [(func, kw)])
283 self.client.calls = []
284
285 def testPrivmsg(self):
286 msg = "Tooty toot toot."
287 self._clientTestImpl("sender", "#group", "PRIVMSG", msg, "privmsg",
288 ident="ident", host="host",
289 # Expected results below
290 user="sender!ident@host",
291 channel="#group",
292 message=msg)
293
294 self._clientTestImpl("sender", "recipient", "PRIVMSG", msg, "privmsg",
295 ident="ident", host="host",
296 # Expected results below
297 user="sender!ident@host",
298 channel="recipient",
299 message=msg)
300
301 class BasicServerFunctionalityTestCase(unittest.TestCase):
302 def setUp(self):
303 self.f = StringIOWithoutClosing()
304 self.t = protocol.FileWrapper(self.f)
305 self.p = irc.IRC()
306 self.p.makeConnection(self.t)
307
308 def check(self, s):
309 self.assertEquals(self.f.getvalue(), s)
310
311 def testPrivmsg(self):
312 self.p.privmsg("this-is-sender", "this-is-recip", "this is message")
313 self.check(":this-is-sender PRIVMSG this-is-recip :this is message\r\n")
314
315 def testNotice(self):
316 self.p.notice("this-is-sender", "this-is-recip", "this is notice")
317 self.check(":this-is-sender NOTICE this-is-recip :this is notice\r\n")
318
319 def testAction(self):
320 self.p.action("this-is-sender", "this-is-recip", "this is action")
321 self.check(":this-is-sender ACTION this-is-recip :this is action\r\n")
322
323 def testJoin(self):
324 self.p.join("this-person", "#this-channel")
325 self.check(":this-person JOIN #this-channel\r\n")
326
327 def testPart(self):
328 self.p.part("this-person", "#that-channel")
329 self.check(":this-person PART #that-channel\r\n")
330
331 def testWhois(self):
332 """
333 Verify that a whois by the client receives the right protocol actions
334 from the server.
335 """
336 timestamp = int(time.time()-100)
337 hostname = self.p.hostname
338 req = 'requesting-nick'
339 targ = 'target-nick'
340 self.p.whois(req, targ, 'target', 'host.com',
341 'Target User', 'irc.host.com', 'A fake server', False,
342 12, timestamp, ['#fakeusers', '#fakemisc'])
343 expected = '\r\n'.join([
344 ':%(hostname)s 311 %(req)s %(targ)s target host.com * :Target User',
345 ':%(hostname)s 312 %(req)s %(targ)s irc.host.com :A fake server',
346 ':%(hostname)s 317 %(req)s %(targ)s 12 %(timestamp)s :seconds idle, signon time' ,
347 ':%(hostname)s 319 %(req)s %(targ)s :#fakeusers #fakemisc',
348 ':%(hostname)s 318 %(req)s %(targ)s :End of WHOIS list.',
349 '']) % dict(hostname=hostname, timestamp=timestamp, req=req, targ=targ)
350 self.check(expected)
351
352
353 class DummyClient(irc.IRCClient):
354 def __init__(self):
355 self.lines = []
356 def sendLine(self, m):
357 self.lines.append(m)
358
359
360 class ClientMsgTests(unittest.TestCase):
361 def setUp(self):
362 self.client = DummyClient()
363
364 def testSingleLine(self):
365 self.client.msg('foo', 'bar')
366 self.assertEquals(self.client.lines, ['PRIVMSG foo :bar'])
367
368 def testDodgyMaxLength(self):
369 self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 0)
370 self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 3)
371
372 def testMultipleLine(self):
373 maxLen = len('PRIVMSG foo :') + 3 + 2 # 2 for line endings
374 self.client.msg('foo', 'barbazbo', maxLen)
375 self.assertEquals(self.client.lines, ['PRIVMSG foo :bar',
376 'PRIVMSG foo :baz',
377 'PRIVMSG foo :bo'])
378
379 def testSufficientWidth(self):
380 msg = 'barbazbo'
381 maxLen = len('PRIVMSG foo :%s' % (msg,)) + 2
382 self.client.msg('foo', msg, maxLen)
383 self.assertEquals(self.client.lines, ['PRIVMSG foo :%s' % (msg,)])
384 self.client.lines = []
385 self.client.msg('foo', msg, maxLen-1)
386 self.assertEquals(2, len(self.client.lines))
387 self.client.lines = []
388 self.client.msg('foo', msg, maxLen+1)
389 self.assertEquals(1, len(self.client.lines))
390
391 def testSplitSanity(self):
392 # Whiteboxing
393 self.assertRaises(ValueError, irc.split, 'foo', -1)
394 self.assertRaises(ValueError, irc.split, 'foo', 0)
395 self.assertEquals([], irc.split('', 1))
396 self.assertEquals([], irc.split(''))
397
398
399 class ClientTests(TestCase):
400 """
401 Tests for the protocol-level behavior of IRCClient methods intended to
402 be called by application code.
403 """
404 def setUp(self):
405 self.transport = StringIO()
406 self.protocol = IRCClient()
407 self.protocol.performLogin = False
408 self.protocol.makeConnection(self.transport)
409
410 # Sanity check - we don't want anything to have happened at this
411 # point, since we're not in a test yet.
412 self.failIf(self.transport.getvalue())
413
414
415 def test_register(self):
416 """
417 Verify that the L{IRCClient.register} method sends a a USER command
418 with the correct arguments.
419 """
420 username = 'testuser'
421 hostname = 'testhost'
422 servername = 'testserver'
423 self.protocol.realname = 'testname'
424 self.protocol.password = None
425 self.protocol.register(username, hostname, servername)
426 expected = [
427 'NICK %s' % (username,),
428 'USER %s %s %s :%s' % (
429 username, hostname, servername, self.protocol.realname),
430 '']
431 self.assertEqual(self.transport.getvalue().split('\r\n'), expected)
432
433
434 def test_registerWithPassword(self):
435 """
436 Verify that if the C{password} attribute of L{IRCClient} is not
437 C{None}, the C{register} method also authenticates using it.
438 """
439 username = 'testuser'
440 hostname = 'testhost'
441 servername = 'testserver'
442 self.protocol.realname = 'testname'
443 self.protocol.password = 'testpass'
444 self.protocol.register(username, hostname, servername)
445 expected = [
446 'PASS %s' % (self.protocol.password,),
447 'NICK %s' % (username,),
448 'USER %s %s %s :%s' % (
449 username, hostname, servername, self.protocol.realname),
450 '']
451 self.assertEqual(self.transport.getvalue().split('\r\n'), expected)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698