Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(9)

Side by Side Diff: third_party/twisted_8_1/twisted/conch/test/test_telnet.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.conch.test.test_telnet -*-
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Tests for L{twisted.conch.telnet}.
7 """
8
9 from zope.interface import implements
10
11 from twisted.internet import defer
12
13 from twisted.conch import telnet
14
15 from twisted.trial import unittest
16 from twisted.test import proto_helpers
17
18 class TestProtocol:
19 implements(telnet.ITelnetProtocol)
20
21 localEnableable = ()
22 remoteEnableable = ()
23
24 def __init__(self):
25 self.bytes = ''
26 self.subcmd = ''
27 self.calls = []
28
29 self.enabledLocal = []
30 self.enabledRemote = []
31 self.disabledLocal = []
32 self.disabledRemote = []
33
34 def makeConnection(self, transport):
35 d = transport.negotiationMap = {}
36 d['\x12'] = self.neg_TEST_COMMAND
37
38 d = transport.commandMap = transport.commandMap.copy()
39 for cmd in ('NOP', 'DM', 'BRK', 'IP', 'AO', 'AYT', 'EC', 'EL', 'GA'):
40 d[getattr(telnet, cmd)] = lambda arg, cmd=cmd: self.calls.append(cmd )
41
42 def dataReceived(self, bytes):
43 self.bytes += bytes
44
45 def connectionLost(self, reason):
46 pass
47
48 def neg_TEST_COMMAND(self, payload):
49 self.subcmd = payload
50
51 def enableLocal(self, option):
52 if option in self.localEnableable:
53 self.enabledLocal.append(option)
54 return True
55 return False
56
57 def disableLocal(self, option):
58 self.disabledLocal.append(option)
59
60 def enableRemote(self, option):
61 if option in self.remoteEnableable:
62 self.enabledRemote.append(option)
63 return True
64 return False
65
66 def disableRemote(self, option):
67 self.disabledRemote.append(option)
68
69
70
71 class TelnetTransportTestCase(unittest.TestCase):
72 """
73 Tests for L{telnet.TelnetTransport}.
74 """
75 def setUp(self):
76 self.p = telnet.TelnetTransport(TestProtocol)
77 self.t = proto_helpers.StringTransport()
78 self.p.makeConnection(self.t)
79
80 def testRegularBytes(self):
81 # Just send a bunch of bytes. None of these do anything
82 # with telnet. They should pass right through to the
83 # application layer.
84 h = self.p.protocol
85
86 L = ["here are some bytes la la la",
87 "some more arrive here",
88 "lots of bytes to play with",
89 "la la la",
90 "ta de da",
91 "dum"]
92 for b in L:
93 self.p.dataReceived(b)
94
95 self.assertEquals(h.bytes, ''.join(L))
96
97 def testNewlineHandling(self):
98 # Send various kinds of newlines and make sure they get translated
99 # into \n.
100 h = self.p.protocol
101
102 L = ["here is the first line\r\n",
103 "here is the second line\r\0",
104 "here is the third line\r\n",
105 "here is the last line\r\0"]
106
107 for b in L:
108 self.p.dataReceived(b)
109
110 self.assertEquals(h.bytes, L[0][:-2] + '\n' +
111 L[1][:-2] + '\r' +
112 L[2][:-2] + '\n' +
113 L[3][:-2] + '\r')
114
115 def testIACEscape(self):
116 # Send a bunch of bytes and a couple quoted \xFFs. Unquoted,
117 # \xFF is a telnet command. Quoted, one of them from each pair
118 # should be passed through to the application layer.
119 h = self.p.protocol
120
121 L = ["here are some bytes\xff\xff with an embedded IAC",
122 "and here is a test of a border escape\xff",
123 "\xff did you get that IAC?"]
124
125 for b in L:
126 self.p.dataReceived(b)
127
128 self.assertEquals(h.bytes, ''.join(L).replace('\xff\xff', '\xff'))
129
130 def _simpleCommandTest(self, cmdName):
131 # Send a single simple telnet command and make sure
132 # it gets noticed and the appropriate method gets
133 # called.
134 h = self.p.protocol
135
136 cmd = telnet.IAC + getattr(telnet, cmdName)
137 L = ["Here's some bytes, tra la la",
138 "But ono!" + cmd + " an interrupt"]
139
140 for b in L:
141 self.p.dataReceived(b)
142
143 self.assertEquals(h.calls, [cmdName])
144 self.assertEquals(h.bytes, ''.join(L).replace(cmd, ''))
145
146 def testInterrupt(self):
147 self._simpleCommandTest("IP")
148
149 def testNoOperation(self):
150 self._simpleCommandTest("NOP")
151
152 def testDataMark(self):
153 self._simpleCommandTest("DM")
154
155 def testBreak(self):
156 self._simpleCommandTest("BRK")
157
158 def testAbortOutput(self):
159 self._simpleCommandTest("AO")
160
161 def testAreYouThere(self):
162 self._simpleCommandTest("AYT")
163
164 def testEraseCharacter(self):
165 self._simpleCommandTest("EC")
166
167 def testEraseLine(self):
168 self._simpleCommandTest("EL")
169
170 def testGoAhead(self):
171 self._simpleCommandTest("GA")
172
173 def testSubnegotiation(self):
174 # Send a subnegotiation command and make sure it gets
175 # parsed and that the correct method is called.
176 h = self.p.protocol
177
178 cmd = telnet.IAC + telnet.SB + '\x12hello world' + telnet.IAC + telnet.S E
179 L = ["These are some bytes but soon" + cmd,
180 "there will be some more"]
181
182 for b in L:
183 self.p.dataReceived(b)
184
185 self.assertEquals(h.bytes, ''.join(L).replace(cmd, ''))
186 self.assertEquals(h.subcmd, list("hello world"))
187
188 def testSubnegotiationWithEmbeddedSE(self):
189 # Send a subnegotiation command with an embedded SE. Make sure
190 # that SE gets passed to the correct method.
191 h = self.p.protocol
192
193 cmd = (telnet.IAC + telnet.SB +
194 '\x12' + telnet.SE +
195 telnet.IAC + telnet.SE)
196
197 L = ["Some bytes are here" + cmd + "and here",
198 "and here"]
199
200 for b in L:
201 self.p.dataReceived(b)
202
203 self.assertEquals(h.bytes, ''.join(L).replace(cmd, ''))
204 self.assertEquals(h.subcmd, [telnet.SE])
205
206 def testBoundarySubnegotiation(self):
207 # Send a subnegotiation command. Split it at every possible byte bounda ry
208 # and make sure it always gets parsed and that it is passed to the corre ct
209 # method.
210 cmd = (telnet.IAC + telnet.SB +
211 '\x12' + telnet.SE + 'hello' +
212 telnet.IAC + telnet.SE)
213
214 for i in range(len(cmd)):
215 h = self.p.protocol = TestProtocol()
216 h.makeConnection(self.p)
217
218 a, b = cmd[:i], cmd[i:]
219 L = ["first part" + a,
220 b + "last part"]
221
222 for bytes in L:
223 self.p.dataReceived(bytes)
224
225 self.assertEquals(h.bytes, ''.join(L).replace(cmd, ''))
226 self.assertEquals(h.subcmd, [telnet.SE] + list('hello'))
227
228 def _enabledHelper(self, o, eL=[], eR=[], dL=[], dR=[]):
229 self.assertEquals(o.enabledLocal, eL)
230 self.assertEquals(o.enabledRemote, eR)
231 self.assertEquals(o.disabledLocal, dL)
232 self.assertEquals(o.disabledRemote, dR)
233
234 def testRefuseWill(self):
235 # Try to enable an option. The server should refuse to enable it.
236 cmd = telnet.IAC + telnet.WILL + '\x12'
237
238 bytes = "surrounding bytes" + cmd + "to spice things up"
239 self.p.dataReceived(bytes)
240
241 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
242 self.assertEquals(self.t.value(), telnet.IAC + telnet.DONT + '\x12')
243 self._enabledHelper(self.p.protocol)
244
245 def testRefuseDo(self):
246 # Try to enable an option. The server should refuse to enable it.
247 cmd = telnet.IAC + telnet.DO + '\x12'
248
249 bytes = "surrounding bytes" + cmd + "to spice things up"
250 self.p.dataReceived(bytes)
251
252 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
253 self.assertEquals(self.t.value(), telnet.IAC + telnet.WONT + '\x12')
254 self._enabledHelper(self.p.protocol)
255
256 def testAcceptDo(self):
257 # Try to enable an option. The option is in our allowEnable
258 # list, so we will allow it to be enabled.
259 cmd = telnet.IAC + telnet.DO + '\x19'
260 bytes = 'padding' + cmd + 'trailer'
261
262 h = self.p.protocol
263 h.localEnableable = ('\x19',)
264 self.p.dataReceived(bytes)
265
266 self.assertEquals(self.t.value(), telnet.IAC + telnet.WILL + '\x19')
267 self._enabledHelper(h, eL=['\x19'])
268
269 def testAcceptWill(self):
270 # Same as testAcceptDo, but reversed.
271 cmd = telnet.IAC + telnet.WILL + '\x91'
272 bytes = 'header' + cmd + 'padding'
273
274 h = self.p.protocol
275 h.remoteEnableable = ('\x91',)
276 self.p.dataReceived(bytes)
277
278 self.assertEquals(self.t.value(), telnet.IAC + telnet.DO + '\x91')
279 self._enabledHelper(h, eR=['\x91'])
280
281 def testAcceptWont(self):
282 # Try to disable an option. The server must allow any option to
283 # be disabled at any time. Make sure it disables it and sends
284 # back an acknowledgement of this.
285 cmd = telnet.IAC + telnet.WONT + '\x29'
286
287 # Jimmy it - after these two lines, the server will be in a state
288 # such that it believes the option to have been previously enabled
289 # via normal negotiation.
290 s = self.p.getOptionState('\x29')
291 s.him.state = 'yes'
292
293 bytes = "fiddle dee" + cmd
294 self.p.dataReceived(bytes)
295
296 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
297 self.assertEquals(self.t.value(), telnet.IAC + telnet.DONT + '\x29')
298 self.assertEquals(s.him.state, 'no')
299 self._enabledHelper(self.p.protocol, dR=['\x29'])
300
301 def testAcceptDont(self):
302 # Try to disable an option. The server must allow any option to
303 # be disabled at any time. Make sure it disables it and sends
304 # back an acknowledgement of this.
305 cmd = telnet.IAC + telnet.DONT + '\x29'
306
307 # Jimmy it - after these two lines, the server will be in a state
308 # such that it believes the option to have beenp previously enabled
309 # via normal negotiation.
310 s = self.p.getOptionState('\x29')
311 s.us.state = 'yes'
312
313 bytes = "fiddle dum " + cmd
314 self.p.dataReceived(bytes)
315
316 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
317 self.assertEquals(self.t.value(), telnet.IAC + telnet.WONT + '\x29')
318 self.assertEquals(s.us.state, 'no')
319 self._enabledHelper(self.p.protocol, dL=['\x29'])
320
321 def testIgnoreWont(self):
322 # Try to disable an option. The option is already disabled. The
323 # server should send nothing in response to this.
324 cmd = telnet.IAC + telnet.WONT + '\x47'
325
326 bytes = "dum de dum" + cmd + "tra la la"
327 self.p.dataReceived(bytes)
328
329 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
330 self.assertEquals(self.t.value(), '')
331 self._enabledHelper(self.p.protocol)
332
333 def testIgnoreDont(self):
334 # Try to disable an option. The option is already disabled. The
335 # server should send nothing in response to this. Doing so could
336 # lead to a negotiation loop.
337 cmd = telnet.IAC + telnet.DONT + '\x47'
338
339 bytes = "dum de dum" + cmd + "tra la la"
340 self.p.dataReceived(bytes)
341
342 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
343 self.assertEquals(self.t.value(), '')
344 self._enabledHelper(self.p.protocol)
345
346 def testIgnoreWill(self):
347 # Try to enable an option. The option is already enabled. The
348 # server should send nothing in response to this. Doing so could
349 # lead to a negotiation loop.
350 cmd = telnet.IAC + telnet.WILL + '\x56'
351
352 # Jimmy it - after these two lines, the server will be in a state
353 # such that it believes the option to have been previously enabled
354 # via normal negotiation.
355 s = self.p.getOptionState('\x56')
356 s.him.state = 'yes'
357
358 bytes = "tra la la" + cmd + "dum de dum"
359 self.p.dataReceived(bytes)
360
361 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
362 self.assertEquals(self.t.value(), '')
363 self._enabledHelper(self.p.protocol)
364
365 def testIgnoreDo(self):
366 # Try to enable an option. The option is already enabled. The
367 # server should send nothing in response to this. Doing so could
368 # lead to a negotiation loop.
369 cmd = telnet.IAC + telnet.DO + '\x56'
370
371 # Jimmy it - after these two lines, the server will be in a state
372 # such that it believes the option to have been previously enabled
373 # via normal negotiation.
374 s = self.p.getOptionState('\x56')
375 s.us.state = 'yes'
376
377 bytes = "tra la la" + cmd + "dum de dum"
378 self.p.dataReceived(bytes)
379
380 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
381 self.assertEquals(self.t.value(), '')
382 self._enabledHelper(self.p.protocol)
383
384 def testAcceptedEnableRequest(self):
385 # Try to enable an option through the user-level API. This
386 # returns a Deferred that fires when negotiation about the option
387 # finishes. Make sure it fires, make sure state gets updated
388 # properly, make sure the result indicates the option was enabled.
389 d = self.p.do('\x42')
390
391 h = self.p.protocol
392 h.remoteEnableable = ('\x42',)
393
394 self.assertEquals(self.t.value(), telnet.IAC + telnet.DO + '\x42')
395
396 self.p.dataReceived(telnet.IAC + telnet.WILL + '\x42')
397
398 d.addCallback(self.assertEquals, True)
399 d.addCallback(lambda _: self._enabledHelper(h, eR=['\x42']))
400 return d
401
402 def testRefusedEnableRequest(self):
403 # Try to enable an option through the user-level API. This
404 # returns a Deferred that fires when negotiation about the option
405 # finishes. Make sure it fires, make sure state gets updated
406 # properly, make sure the result indicates the option was enabled.
407 d = self.p.do('\x42')
408
409 self.assertEquals(self.t.value(), telnet.IAC + telnet.DO + '\x42')
410
411 self.p.dataReceived(telnet.IAC + telnet.WONT + '\x42')
412
413 d = self.assertFailure(d, telnet.OptionRefused)
414 d.addCallback(lambda _: self._enabledHelper(self.p.protocol))
415 return d
416
417 def testAcceptedDisableRequest(self):
418 # Try to disable an option through the user-level API. This
419 # returns a Deferred that fires when negotiation about the option
420 # finishes. Make sure it fires, make sure state gets updated
421 # properly, make sure the result indicates the option was enabled.
422 s = self.p.getOptionState('\x42')
423 s.him.state = 'yes'
424
425 d = self.p.dont('\x42')
426
427 self.assertEquals(self.t.value(), telnet.IAC + telnet.DONT + '\x42')
428
429 self.p.dataReceived(telnet.IAC + telnet.WONT + '\x42')
430
431 d.addCallback(self.assertEquals, True)
432 d.addCallback(lambda _: self._enabledHelper(self.p.protocol,
433 dR=['\x42']))
434 return d
435
436 def testNegotiationBlocksFurtherNegotiation(self):
437 # Try to disable an option, then immediately try to enable it, then
438 # immediately try to disable it. Ensure that the 2nd and 3rd calls
439 # fail quickly with the right exception.
440 s = self.p.getOptionState('\x24')
441 s.him.state = 'yes'
442 d2 = self.p.dont('\x24') # fires after the first line of _final
443
444 def _do(x):
445 d = self.p.do('\x24')
446 return self.assertFailure(d, telnet.AlreadyNegotiating)
447
448 def _dont(x):
449 d = self.p.dont('\x24')
450 return self.assertFailure(d, telnet.AlreadyNegotiating)
451
452 def _final(x):
453 self.p.dataReceived(telnet.IAC + telnet.WONT + '\x24')
454 # an assertion that only passes if d2 has fired
455 self._enabledHelper(self.p.protocol, dR=['\x24'])
456 # Make sure we allow this
457 self.p.protocol.remoteEnableable = ('\x24',)
458 d = self.p.do('\x24')
459 self.p.dataReceived(telnet.IAC + telnet.WILL + '\x24')
460 d.addCallback(self.assertEquals, True)
461 d.addCallback(lambda _: self._enabledHelper(self.p.protocol,
462 eR=['\x24'],
463 dR=['\x24']))
464 return d
465
466 d = _do(None)
467 d.addCallback(_dont)
468 d.addCallback(_final)
469 return d
470
471 def testSuperfluousDisableRequestRaises(self):
472 # Try to disable a disabled option. Make sure it fails properly.
473 d = self.p.dont('\xab')
474 return self.assertFailure(d, telnet.AlreadyDisabled)
475
476 def testSuperfluousEnableRequestRaises(self):
477 # Try to disable a disabled option. Make sure it fails properly.
478 s = self.p.getOptionState('\xab')
479 s.him.state = 'yes'
480 d = self.p.do('\xab')
481 return self.assertFailure(d, telnet.AlreadyEnabled)
482
483 def testLostConnectionFailsDeferreds(self):
484 d1 = self.p.do('\x12')
485 d2 = self.p.do('\x23')
486 d3 = self.p.do('\x34')
487
488 class TestException(Exception):
489 pass
490
491 self.p.connectionLost(TestException("Total failure!"))
492
493 d1 = self.assertFailure(d1, TestException)
494 d2 = self.assertFailure(d2, TestException)
495 d3 = self.assertFailure(d3, TestException)
496 return defer.gatherResults([d1, d2, d3])
497
498
499 class TestTelnet(telnet.Telnet):
500 """
501 A trivial extension of the telnet protocol class useful to unit tests.
502 """
503 def __init__(self):
504 telnet.Telnet.__init__(self)
505 self.events = []
506
507
508 def applicationDataReceived(self, bytes):
509 """
510 Record the given data in C{self.events}.
511 """
512 self.events.append(('bytes', bytes))
513
514
515 def unhandledCommand(self, command, bytes):
516 """
517 Record the given command in C{self.events}.
518 """
519 self.events.append(('command', command, bytes))
520
521
522 def unhandledSubnegotiation(self, command, bytes):
523 """
524 Record the given subnegotiation command in C{self.events}.
525 """
526 self.events.append(('negotiate', command, bytes))
527
528
529
530 class TelnetTests(unittest.TestCase):
531 """
532 Tests for L{telnet.Telnet}.
533
534 L{telnet.Telnet} implements the TELNET protocol (RFC 854), including option
535 and suboption negotiation, and option state tracking.
536 """
537 def setUp(self):
538 """
539 Create an unconnected L{telnet.Telnet} to be used by tests.
540 """
541 self.protocol = TestTelnet()
542
543
544 def test_enableLocal(self):
545 """
546 L{telnet.Telnet.enableLocal} should reject all options, since
547 L{telnet.Telnet} does not know how to implement any options.
548 """
549 self.assertFalse(self.protocol.enableLocal('\0'))
550
551
552 def test_enableRemote(self):
553 """
554 L{telnet.Telnet.enableRemote} should reject all options, since
555 L{telnet.Telnet} does not know how to implement any options.
556 """
557 self.assertFalse(self.protocol.enableRemote('\0'))
558
559
560 def test_disableLocal(self):
561 """
562 It is an error for L{telnet.Telnet.disableLocal} to be called, since
563 L{telnet.Telnet.enableLocal} will never allow any options to be enabled
564 locally. If a subclass overrides enableLocal, it must also override
565 disableLocal.
566 """
567 self.assertRaises(NotImplementedError, self.protocol.disableLocal, '\0')
568
569
570 def test_disableRemote(self):
571 """
572 It is an error for L{telnet.Telnet.disableRemote} to be called, since
573 L{telnet.Telnet.enableRemote} will never allow any options to be
574 enabled remotely. If a subclass overrides enableRemote, it must also
575 override disableRemote.
576 """
577 self.assertRaises(NotImplementedError, self.protocol.disableRemote, '\0' )
578
579
580 def _deliver(self, bytes, *expected):
581 """
582 Pass the given bytes to the protocol's C{dataReceived} method and
583 assert that the given events occur.
584 """
585 received = self.protocol.events = []
586 self.protocol.dataReceived(bytes)
587 self.assertEqual(received, list(expected))
588
589
590 def test_oneApplicationDataByte(self):
591 """
592 One application-data byte in the default state gets delivered right
593 away.
594 """
595 self._deliver('a', ('bytes', 'a'))
596
597
598 def test_twoApplicationDataBytes(self):
599 """
600 Two application-data bytes in the default state get delivered
601 together.
602 """
603 self._deliver('bc', ('bytes', 'bc'))
604
605
606 def test_threeApplicationDataBytes(self):
607 """
608 Three application-data bytes followed by a control byte get
609 delivered, but the control byte doesn't.
610 """
611 self._deliver('def' + telnet.IAC, ('bytes', 'def'))
612
613
614 def test_escapedControl(self):
615 """
616 IAC in the escaped state gets delivered and so does another
617 application-data byte following it.
618 """
619 self._deliver(telnet.IAC)
620 self._deliver(telnet.IAC + 'g', ('bytes', telnet.IAC + 'g'))
621
622
623 def test_carriageReturn(self):
624 """
625 A carriage return only puts the protocol into the newline state. A
626 linefeed in the newline state causes just the newline to be
627 delivered. A nul in the newline state causes a carriage return to
628 be delivered. Anything else causes a carriage return and that thing
629 to be delivered.
630 """
631 self._deliver('\r')
632 self._deliver('\n', ('bytes', '\n'))
633 self._deliver('\r\n', ('bytes', '\n'))
634
635 self._deliver('\r')
636 self._deliver('\0', ('bytes', '\r'))
637 self._deliver('\r\0', ('bytes', '\r'))
638
639 self._deliver('\r')
640 self._deliver('a', ('bytes', '\ra'))
641 self._deliver('\ra', ('bytes', '\ra'))
642
643
644 def test_applicationDataBeforeSimpleCommand(self):
645 """
646 Application bytes received before a command are delivered before the
647 command is processed.
648 """
649 self._deliver(
650 'x' + telnet.IAC + telnet.NOP,
651 ('bytes', 'x'), ('command', telnet.NOP, None))
652
653
654 def test_applicationDataBeforeCommand(self):
655 """
656 Application bytes received before a WILL/WONT/DO/DONT are delivered
657 before the command is processed.
658 """
659 self.protocol.commandMap = {}
660 self._deliver(
661 'y' + telnet.IAC + telnet.WILL + '\x00',
662 ('bytes', 'y'), ('command', telnet.WILL, '\x00'))
663
664
665 def test_applicationDataBeforeSubnegotiation(self):
666 """
667 Application bytes received before a subnegotiation command are
668 delivered before the negotiation is processed.
669 """
670 self._deliver(
671 'z' + telnet.IAC + telnet.SB + 'Qx' + telnet.IAC + telnet.SE,
672 ('bytes', 'z'), ('negotiate', 'Q', ['x']))
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/conch/test/test_ssh.py ('k') | third_party/twisted_8_1/twisted/conch/test/test_text.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698