OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.conch.test.test_telnet -*- | |
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 """ | |
6 Tests for L{twisted.conch.telnet}. | |
7 """ | |
8 | |
9 from zope.interface import implements | |
10 | |
11 from twisted.internet import defer | |
12 | |
13 from twisted.conch import telnet | |
14 | |
15 from twisted.trial import unittest | |
16 from twisted.test import proto_helpers | |
17 | |
18 class TestProtocol: | |
19 implements(telnet.ITelnetProtocol) | |
20 | |
21 localEnableable = () | |
22 remoteEnableable = () | |
23 | |
24 def __init__(self): | |
25 self.bytes = '' | |
26 self.subcmd = '' | |
27 self.calls = [] | |
28 | |
29 self.enabledLocal = [] | |
30 self.enabledRemote = [] | |
31 self.disabledLocal = [] | |
32 self.disabledRemote = [] | |
33 | |
34 def makeConnection(self, transport): | |
35 d = transport.negotiationMap = {} | |
36 d['\x12'] = self.neg_TEST_COMMAND | |
37 | |
38 d = transport.commandMap = transport.commandMap.copy() | |
39 for cmd in ('NOP', 'DM', 'BRK', 'IP', 'AO', 'AYT', 'EC', 'EL', 'GA'): | |
40 d[getattr(telnet, cmd)] = lambda arg, cmd=cmd: self.calls.append(cmd
) | |
41 | |
42 def dataReceived(self, bytes): | |
43 self.bytes += bytes | |
44 | |
45 def connectionLost(self, reason): | |
46 pass | |
47 | |
48 def neg_TEST_COMMAND(self, payload): | |
49 self.subcmd = payload | |
50 | |
51 def enableLocal(self, option): | |
52 if option in self.localEnableable: | |
53 self.enabledLocal.append(option) | |
54 return True | |
55 return False | |
56 | |
57 def disableLocal(self, option): | |
58 self.disabledLocal.append(option) | |
59 | |
60 def enableRemote(self, option): | |
61 if option in self.remoteEnableable: | |
62 self.enabledRemote.append(option) | |
63 return True | |
64 return False | |
65 | |
66 def disableRemote(self, option): | |
67 self.disabledRemote.append(option) | |
68 | |
69 | |
70 | |
71 class TelnetTransportTestCase(unittest.TestCase): | |
72 """ | |
73 Tests for L{telnet.TelnetTransport}. | |
74 """ | |
75 def setUp(self): | |
76 self.p = telnet.TelnetTransport(TestProtocol) | |
77 self.t = proto_helpers.StringTransport() | |
78 self.p.makeConnection(self.t) | |
79 | |
80 def testRegularBytes(self): | |
81 # Just send a bunch of bytes. None of these do anything | |
82 # with telnet. They should pass right through to the | |
83 # application layer. | |
84 h = self.p.protocol | |
85 | |
86 L = ["here are some bytes la la la", | |
87 "some more arrive here", | |
88 "lots of bytes to play with", | |
89 "la la la", | |
90 "ta de da", | |
91 "dum"] | |
92 for b in L: | |
93 self.p.dataReceived(b) | |
94 | |
95 self.assertEquals(h.bytes, ''.join(L)) | |
96 | |
97 def testNewlineHandling(self): | |
98 # Send various kinds of newlines and make sure they get translated | |
99 # into \n. | |
100 h = self.p.protocol | |
101 | |
102 L = ["here is the first line\r\n", | |
103 "here is the second line\r\0", | |
104 "here is the third line\r\n", | |
105 "here is the last line\r\0"] | |
106 | |
107 for b in L: | |
108 self.p.dataReceived(b) | |
109 | |
110 self.assertEquals(h.bytes, L[0][:-2] + '\n' + | |
111 L[1][:-2] + '\r' + | |
112 L[2][:-2] + '\n' + | |
113 L[3][:-2] + '\r') | |
114 | |
115 def testIACEscape(self): | |
116 # Send a bunch of bytes and a couple quoted \xFFs. Unquoted, | |
117 # \xFF is a telnet command. Quoted, one of them from each pair | |
118 # should be passed through to the application layer. | |
119 h = self.p.protocol | |
120 | |
121 L = ["here are some bytes\xff\xff with an embedded IAC", | |
122 "and here is a test of a border escape\xff", | |
123 "\xff did you get that IAC?"] | |
124 | |
125 for b in L: | |
126 self.p.dataReceived(b) | |
127 | |
128 self.assertEquals(h.bytes, ''.join(L).replace('\xff\xff', '\xff')) | |
129 | |
130 def _simpleCommandTest(self, cmdName): | |
131 # Send a single simple telnet command and make sure | |
132 # it gets noticed and the appropriate method gets | |
133 # called. | |
134 h = self.p.protocol | |
135 | |
136 cmd = telnet.IAC + getattr(telnet, cmdName) | |
137 L = ["Here's some bytes, tra la la", | |
138 "But ono!" + cmd + " an interrupt"] | |
139 | |
140 for b in L: | |
141 self.p.dataReceived(b) | |
142 | |
143 self.assertEquals(h.calls, [cmdName]) | |
144 self.assertEquals(h.bytes, ''.join(L).replace(cmd, '')) | |
145 | |
146 def testInterrupt(self): | |
147 self._simpleCommandTest("IP") | |
148 | |
149 def testNoOperation(self): | |
150 self._simpleCommandTest("NOP") | |
151 | |
152 def testDataMark(self): | |
153 self._simpleCommandTest("DM") | |
154 | |
155 def testBreak(self): | |
156 self._simpleCommandTest("BRK") | |
157 | |
158 def testAbortOutput(self): | |
159 self._simpleCommandTest("AO") | |
160 | |
161 def testAreYouThere(self): | |
162 self._simpleCommandTest("AYT") | |
163 | |
164 def testEraseCharacter(self): | |
165 self._simpleCommandTest("EC") | |
166 | |
167 def testEraseLine(self): | |
168 self._simpleCommandTest("EL") | |
169 | |
170 def testGoAhead(self): | |
171 self._simpleCommandTest("GA") | |
172 | |
173 def testSubnegotiation(self): | |
174 # Send a subnegotiation command and make sure it gets | |
175 # parsed and that the correct method is called. | |
176 h = self.p.protocol | |
177 | |
178 cmd = telnet.IAC + telnet.SB + '\x12hello world' + telnet.IAC + telnet.S
E | |
179 L = ["These are some bytes but soon" + cmd, | |
180 "there will be some more"] | |
181 | |
182 for b in L: | |
183 self.p.dataReceived(b) | |
184 | |
185 self.assertEquals(h.bytes, ''.join(L).replace(cmd, '')) | |
186 self.assertEquals(h.subcmd, list("hello world")) | |
187 | |
188 def testSubnegotiationWithEmbeddedSE(self): | |
189 # Send a subnegotiation command with an embedded SE. Make sure | |
190 # that SE gets passed to the correct method. | |
191 h = self.p.protocol | |
192 | |
193 cmd = (telnet.IAC + telnet.SB + | |
194 '\x12' + telnet.SE + | |
195 telnet.IAC + telnet.SE) | |
196 | |
197 L = ["Some bytes are here" + cmd + "and here", | |
198 "and here"] | |
199 | |
200 for b in L: | |
201 self.p.dataReceived(b) | |
202 | |
203 self.assertEquals(h.bytes, ''.join(L).replace(cmd, '')) | |
204 self.assertEquals(h.subcmd, [telnet.SE]) | |
205 | |
206 def testBoundarySubnegotiation(self): | |
207 # Send a subnegotiation command. Split it at every possible byte bounda
ry | |
208 # and make sure it always gets parsed and that it is passed to the corre
ct | |
209 # method. | |
210 cmd = (telnet.IAC + telnet.SB + | |
211 '\x12' + telnet.SE + 'hello' + | |
212 telnet.IAC + telnet.SE) | |
213 | |
214 for i in range(len(cmd)): | |
215 h = self.p.protocol = TestProtocol() | |
216 h.makeConnection(self.p) | |
217 | |
218 a, b = cmd[:i], cmd[i:] | |
219 L = ["first part" + a, | |
220 b + "last part"] | |
221 | |
222 for bytes in L: | |
223 self.p.dataReceived(bytes) | |
224 | |
225 self.assertEquals(h.bytes, ''.join(L).replace(cmd, '')) | |
226 self.assertEquals(h.subcmd, [telnet.SE] + list('hello')) | |
227 | |
228 def _enabledHelper(self, o, eL=[], eR=[], dL=[], dR=[]): | |
229 self.assertEquals(o.enabledLocal, eL) | |
230 self.assertEquals(o.enabledRemote, eR) | |
231 self.assertEquals(o.disabledLocal, dL) | |
232 self.assertEquals(o.disabledRemote, dR) | |
233 | |
234 def testRefuseWill(self): | |
235 # Try to enable an option. The server should refuse to enable it. | |
236 cmd = telnet.IAC + telnet.WILL + '\x12' | |
237 | |
238 bytes = "surrounding bytes" + cmd + "to spice things up" | |
239 self.p.dataReceived(bytes) | |
240 | |
241 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
242 self.assertEquals(self.t.value(), telnet.IAC + telnet.DONT + '\x12') | |
243 self._enabledHelper(self.p.protocol) | |
244 | |
245 def testRefuseDo(self): | |
246 # Try to enable an option. The server should refuse to enable it. | |
247 cmd = telnet.IAC + telnet.DO + '\x12' | |
248 | |
249 bytes = "surrounding bytes" + cmd + "to spice things up" | |
250 self.p.dataReceived(bytes) | |
251 | |
252 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
253 self.assertEquals(self.t.value(), telnet.IAC + telnet.WONT + '\x12') | |
254 self._enabledHelper(self.p.protocol) | |
255 | |
256 def testAcceptDo(self): | |
257 # Try to enable an option. The option is in our allowEnable | |
258 # list, so we will allow it to be enabled. | |
259 cmd = telnet.IAC + telnet.DO + '\x19' | |
260 bytes = 'padding' + cmd + 'trailer' | |
261 | |
262 h = self.p.protocol | |
263 h.localEnableable = ('\x19',) | |
264 self.p.dataReceived(bytes) | |
265 | |
266 self.assertEquals(self.t.value(), telnet.IAC + telnet.WILL + '\x19') | |
267 self._enabledHelper(h, eL=['\x19']) | |
268 | |
269 def testAcceptWill(self): | |
270 # Same as testAcceptDo, but reversed. | |
271 cmd = telnet.IAC + telnet.WILL + '\x91' | |
272 bytes = 'header' + cmd + 'padding' | |
273 | |
274 h = self.p.protocol | |
275 h.remoteEnableable = ('\x91',) | |
276 self.p.dataReceived(bytes) | |
277 | |
278 self.assertEquals(self.t.value(), telnet.IAC + telnet.DO + '\x91') | |
279 self._enabledHelper(h, eR=['\x91']) | |
280 | |
281 def testAcceptWont(self): | |
282 # Try to disable an option. The server must allow any option to | |
283 # be disabled at any time. Make sure it disables it and sends | |
284 # back an acknowledgement of this. | |
285 cmd = telnet.IAC + telnet.WONT + '\x29' | |
286 | |
287 # Jimmy it - after these two lines, the server will be in a state | |
288 # such that it believes the option to have been previously enabled | |
289 # via normal negotiation. | |
290 s = self.p.getOptionState('\x29') | |
291 s.him.state = 'yes' | |
292 | |
293 bytes = "fiddle dee" + cmd | |
294 self.p.dataReceived(bytes) | |
295 | |
296 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
297 self.assertEquals(self.t.value(), telnet.IAC + telnet.DONT + '\x29') | |
298 self.assertEquals(s.him.state, 'no') | |
299 self._enabledHelper(self.p.protocol, dR=['\x29']) | |
300 | |
301 def testAcceptDont(self): | |
302 # Try to disable an option. The server must allow any option to | |
303 # be disabled at any time. Make sure it disables it and sends | |
304 # back an acknowledgement of this. | |
305 cmd = telnet.IAC + telnet.DONT + '\x29' | |
306 | |
307 # Jimmy it - after these two lines, the server will be in a state | |
308 # such that it believes the option to have beenp previously enabled | |
309 # via normal negotiation. | |
310 s = self.p.getOptionState('\x29') | |
311 s.us.state = 'yes' | |
312 | |
313 bytes = "fiddle dum " + cmd | |
314 self.p.dataReceived(bytes) | |
315 | |
316 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
317 self.assertEquals(self.t.value(), telnet.IAC + telnet.WONT + '\x29') | |
318 self.assertEquals(s.us.state, 'no') | |
319 self._enabledHelper(self.p.protocol, dL=['\x29']) | |
320 | |
321 def testIgnoreWont(self): | |
322 # Try to disable an option. The option is already disabled. The | |
323 # server should send nothing in response to this. | |
324 cmd = telnet.IAC + telnet.WONT + '\x47' | |
325 | |
326 bytes = "dum de dum" + cmd + "tra la la" | |
327 self.p.dataReceived(bytes) | |
328 | |
329 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
330 self.assertEquals(self.t.value(), '') | |
331 self._enabledHelper(self.p.protocol) | |
332 | |
333 def testIgnoreDont(self): | |
334 # Try to disable an option. The option is already disabled. The | |
335 # server should send nothing in response to this. Doing so could | |
336 # lead to a negotiation loop. | |
337 cmd = telnet.IAC + telnet.DONT + '\x47' | |
338 | |
339 bytes = "dum de dum" + cmd + "tra la la" | |
340 self.p.dataReceived(bytes) | |
341 | |
342 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
343 self.assertEquals(self.t.value(), '') | |
344 self._enabledHelper(self.p.protocol) | |
345 | |
346 def testIgnoreWill(self): | |
347 # Try to enable an option. The option is already enabled. The | |
348 # server should send nothing in response to this. Doing so could | |
349 # lead to a negotiation loop. | |
350 cmd = telnet.IAC + telnet.WILL + '\x56' | |
351 | |
352 # Jimmy it - after these two lines, the server will be in a state | |
353 # such that it believes the option to have been previously enabled | |
354 # via normal negotiation. | |
355 s = self.p.getOptionState('\x56') | |
356 s.him.state = 'yes' | |
357 | |
358 bytes = "tra la la" + cmd + "dum de dum" | |
359 self.p.dataReceived(bytes) | |
360 | |
361 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
362 self.assertEquals(self.t.value(), '') | |
363 self._enabledHelper(self.p.protocol) | |
364 | |
365 def testIgnoreDo(self): | |
366 # Try to enable an option. The option is already enabled. The | |
367 # server should send nothing in response to this. Doing so could | |
368 # lead to a negotiation loop. | |
369 cmd = telnet.IAC + telnet.DO + '\x56' | |
370 | |
371 # Jimmy it - after these two lines, the server will be in a state | |
372 # such that it believes the option to have been previously enabled | |
373 # via normal negotiation. | |
374 s = self.p.getOptionState('\x56') | |
375 s.us.state = 'yes' | |
376 | |
377 bytes = "tra la la" + cmd + "dum de dum" | |
378 self.p.dataReceived(bytes) | |
379 | |
380 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
381 self.assertEquals(self.t.value(), '') | |
382 self._enabledHelper(self.p.protocol) | |
383 | |
384 def testAcceptedEnableRequest(self): | |
385 # Try to enable an option through the user-level API. This | |
386 # returns a Deferred that fires when negotiation about the option | |
387 # finishes. Make sure it fires, make sure state gets updated | |
388 # properly, make sure the result indicates the option was enabled. | |
389 d = self.p.do('\x42') | |
390 | |
391 h = self.p.protocol | |
392 h.remoteEnableable = ('\x42',) | |
393 | |
394 self.assertEquals(self.t.value(), telnet.IAC + telnet.DO + '\x42') | |
395 | |
396 self.p.dataReceived(telnet.IAC + telnet.WILL + '\x42') | |
397 | |
398 d.addCallback(self.assertEquals, True) | |
399 d.addCallback(lambda _: self._enabledHelper(h, eR=['\x42'])) | |
400 return d | |
401 | |
402 def testRefusedEnableRequest(self): | |
403 # Try to enable an option through the user-level API. This | |
404 # returns a Deferred that fires when negotiation about the option | |
405 # finishes. Make sure it fires, make sure state gets updated | |
406 # properly, make sure the result indicates the option was enabled. | |
407 d = self.p.do('\x42') | |
408 | |
409 self.assertEquals(self.t.value(), telnet.IAC + telnet.DO + '\x42') | |
410 | |
411 self.p.dataReceived(telnet.IAC + telnet.WONT + '\x42') | |
412 | |
413 d = self.assertFailure(d, telnet.OptionRefused) | |
414 d.addCallback(lambda _: self._enabledHelper(self.p.protocol)) | |
415 return d | |
416 | |
417 def testAcceptedDisableRequest(self): | |
418 # Try to disable an option through the user-level API. This | |
419 # returns a Deferred that fires when negotiation about the option | |
420 # finishes. Make sure it fires, make sure state gets updated | |
421 # properly, make sure the result indicates the option was enabled. | |
422 s = self.p.getOptionState('\x42') | |
423 s.him.state = 'yes' | |
424 | |
425 d = self.p.dont('\x42') | |
426 | |
427 self.assertEquals(self.t.value(), telnet.IAC + telnet.DONT + '\x42') | |
428 | |
429 self.p.dataReceived(telnet.IAC + telnet.WONT + '\x42') | |
430 | |
431 d.addCallback(self.assertEquals, True) | |
432 d.addCallback(lambda _: self._enabledHelper(self.p.protocol, | |
433 dR=['\x42'])) | |
434 return d | |
435 | |
436 def testNegotiationBlocksFurtherNegotiation(self): | |
437 # Try to disable an option, then immediately try to enable it, then | |
438 # immediately try to disable it. Ensure that the 2nd and 3rd calls | |
439 # fail quickly with the right exception. | |
440 s = self.p.getOptionState('\x24') | |
441 s.him.state = 'yes' | |
442 d2 = self.p.dont('\x24') # fires after the first line of _final | |
443 | |
444 def _do(x): | |
445 d = self.p.do('\x24') | |
446 return self.assertFailure(d, telnet.AlreadyNegotiating) | |
447 | |
448 def _dont(x): | |
449 d = self.p.dont('\x24') | |
450 return self.assertFailure(d, telnet.AlreadyNegotiating) | |
451 | |
452 def _final(x): | |
453 self.p.dataReceived(telnet.IAC + telnet.WONT + '\x24') | |
454 # an assertion that only passes if d2 has fired | |
455 self._enabledHelper(self.p.protocol, dR=['\x24']) | |
456 # Make sure we allow this | |
457 self.p.protocol.remoteEnableable = ('\x24',) | |
458 d = self.p.do('\x24') | |
459 self.p.dataReceived(telnet.IAC + telnet.WILL + '\x24') | |
460 d.addCallback(self.assertEquals, True) | |
461 d.addCallback(lambda _: self._enabledHelper(self.p.protocol, | |
462 eR=['\x24'], | |
463 dR=['\x24'])) | |
464 return d | |
465 | |
466 d = _do(None) | |
467 d.addCallback(_dont) | |
468 d.addCallback(_final) | |
469 return d | |
470 | |
471 def testSuperfluousDisableRequestRaises(self): | |
472 # Try to disable a disabled option. Make sure it fails properly. | |
473 d = self.p.dont('\xab') | |
474 return self.assertFailure(d, telnet.AlreadyDisabled) | |
475 | |
476 def testSuperfluousEnableRequestRaises(self): | |
477 # Try to disable a disabled option. Make sure it fails properly. | |
478 s = self.p.getOptionState('\xab') | |
479 s.him.state = 'yes' | |
480 d = self.p.do('\xab') | |
481 return self.assertFailure(d, telnet.AlreadyEnabled) | |
482 | |
483 def testLostConnectionFailsDeferreds(self): | |
484 d1 = self.p.do('\x12') | |
485 d2 = self.p.do('\x23') | |
486 d3 = self.p.do('\x34') | |
487 | |
488 class TestException(Exception): | |
489 pass | |
490 | |
491 self.p.connectionLost(TestException("Total failure!")) | |
492 | |
493 d1 = self.assertFailure(d1, TestException) | |
494 d2 = self.assertFailure(d2, TestException) | |
495 d3 = self.assertFailure(d3, TestException) | |
496 return defer.gatherResults([d1, d2, d3]) | |
497 | |
498 | |
499 class TestTelnet(telnet.Telnet): | |
500 """ | |
501 A trivial extension of the telnet protocol class useful to unit tests. | |
502 """ | |
503 def __init__(self): | |
504 telnet.Telnet.__init__(self) | |
505 self.events = [] | |
506 | |
507 | |
508 def applicationDataReceived(self, bytes): | |
509 """ | |
510 Record the given data in C{self.events}. | |
511 """ | |
512 self.events.append(('bytes', bytes)) | |
513 | |
514 | |
515 def unhandledCommand(self, command, bytes): | |
516 """ | |
517 Record the given command in C{self.events}. | |
518 """ | |
519 self.events.append(('command', command, bytes)) | |
520 | |
521 | |
522 def unhandledSubnegotiation(self, command, bytes): | |
523 """ | |
524 Record the given subnegotiation command in C{self.events}. | |
525 """ | |
526 self.events.append(('negotiate', command, bytes)) | |
527 | |
528 | |
529 | |
530 class TelnetTests(unittest.TestCase): | |
531 """ | |
532 Tests for L{telnet.Telnet}. | |
533 | |
534 L{telnet.Telnet} implements the TELNET protocol (RFC 854), including option | |
535 and suboption negotiation, and option state tracking. | |
536 """ | |
537 def setUp(self): | |
538 """ | |
539 Create an unconnected L{telnet.Telnet} to be used by tests. | |
540 """ | |
541 self.protocol = TestTelnet() | |
542 | |
543 | |
544 def test_enableLocal(self): | |
545 """ | |
546 L{telnet.Telnet.enableLocal} should reject all options, since | |
547 L{telnet.Telnet} does not know how to implement any options. | |
548 """ | |
549 self.assertFalse(self.protocol.enableLocal('\0')) | |
550 | |
551 | |
552 def test_enableRemote(self): | |
553 """ | |
554 L{telnet.Telnet.enableRemote} should reject all options, since | |
555 L{telnet.Telnet} does not know how to implement any options. | |
556 """ | |
557 self.assertFalse(self.protocol.enableRemote('\0')) | |
558 | |
559 | |
560 def test_disableLocal(self): | |
561 """ | |
562 It is an error for L{telnet.Telnet.disableLocal} to be called, since | |
563 L{telnet.Telnet.enableLocal} will never allow any options to be enabled | |
564 locally. If a subclass overrides enableLocal, it must also override | |
565 disableLocal. | |
566 """ | |
567 self.assertRaises(NotImplementedError, self.protocol.disableLocal, '\0') | |
568 | |
569 | |
570 def test_disableRemote(self): | |
571 """ | |
572 It is an error for L{telnet.Telnet.disableRemote} to be called, since | |
573 L{telnet.Telnet.enableRemote} will never allow any options to be | |
574 enabled remotely. If a subclass overrides enableRemote, it must also | |
575 override disableRemote. | |
576 """ | |
577 self.assertRaises(NotImplementedError, self.protocol.disableRemote, '\0'
) | |
578 | |
579 | |
580 def _deliver(self, bytes, *expected): | |
581 """ | |
582 Pass the given bytes to the protocol's C{dataReceived} method and | |
583 assert that the given events occur. | |
584 """ | |
585 received = self.protocol.events = [] | |
586 self.protocol.dataReceived(bytes) | |
587 self.assertEqual(received, list(expected)) | |
588 | |
589 | |
590 def test_oneApplicationDataByte(self): | |
591 """ | |
592 One application-data byte in the default state gets delivered right | |
593 away. | |
594 """ | |
595 self._deliver('a', ('bytes', 'a')) | |
596 | |
597 | |
598 def test_twoApplicationDataBytes(self): | |
599 """ | |
600 Two application-data bytes in the default state get delivered | |
601 together. | |
602 """ | |
603 self._deliver('bc', ('bytes', 'bc')) | |
604 | |
605 | |
606 def test_threeApplicationDataBytes(self): | |
607 """ | |
608 Three application-data bytes followed by a control byte get | |
609 delivered, but the control byte doesn't. | |
610 """ | |
611 self._deliver('def' + telnet.IAC, ('bytes', 'def')) | |
612 | |
613 | |
614 def test_escapedControl(self): | |
615 """ | |
616 IAC in the escaped state gets delivered and so does another | |
617 application-data byte following it. | |
618 """ | |
619 self._deliver(telnet.IAC) | |
620 self._deliver(telnet.IAC + 'g', ('bytes', telnet.IAC + 'g')) | |
621 | |
622 | |
623 def test_carriageReturn(self): | |
624 """ | |
625 A carriage return only puts the protocol into the newline state. A | |
626 linefeed in the newline state causes just the newline to be | |
627 delivered. A nul in the newline state causes a carriage return to | |
628 be delivered. Anything else causes a carriage return and that thing | |
629 to be delivered. | |
630 """ | |
631 self._deliver('\r') | |
632 self._deliver('\n', ('bytes', '\n')) | |
633 self._deliver('\r\n', ('bytes', '\n')) | |
634 | |
635 self._deliver('\r') | |
636 self._deliver('\0', ('bytes', '\r')) | |
637 self._deliver('\r\0', ('bytes', '\r')) | |
638 | |
639 self._deliver('\r') | |
640 self._deliver('a', ('bytes', '\ra')) | |
641 self._deliver('\ra', ('bytes', '\ra')) | |
642 | |
643 | |
644 def test_applicationDataBeforeSimpleCommand(self): | |
645 """ | |
646 Application bytes received before a command are delivered before the | |
647 command is processed. | |
648 """ | |
649 self._deliver( | |
650 'x' + telnet.IAC + telnet.NOP, | |
651 ('bytes', 'x'), ('command', telnet.NOP, None)) | |
652 | |
653 | |
654 def test_applicationDataBeforeCommand(self): | |
655 """ | |
656 Application bytes received before a WILL/WONT/DO/DONT are delivered | |
657 before the command is processed. | |
658 """ | |
659 self.protocol.commandMap = {} | |
660 self._deliver( | |
661 'y' + telnet.IAC + telnet.WILL + '\x00', | |
662 ('bytes', 'y'), ('command', telnet.WILL, '\x00')) | |
663 | |
664 | |
665 def test_applicationDataBeforeSubnegotiation(self): | |
666 """ | |
667 Application bytes received before a subnegotiation command are | |
668 delivered before the negotiation is processed. | |
669 """ | |
670 self._deliver( | |
671 'z' + telnet.IAC + telnet.SB + 'Qx' + telnet.IAC + telnet.SE, | |
672 ('bytes', 'z'), ('negotiate', 'Q', ['x'])) | |
OLD | NEW |