| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.conch.test.test_telnet -*- | |
| 2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 3 # See LICENSE for details. | |
| 4 | |
| 5 """ | |
| 6 Tests for L{twisted.conch.telnet}. | |
| 7 """ | |
| 8 | |
| 9 from zope.interface import implements | |
| 10 | |
| 11 from twisted.internet import defer | |
| 12 | |
| 13 from twisted.conch import telnet | |
| 14 | |
| 15 from twisted.trial import unittest | |
| 16 from twisted.test import proto_helpers | |
| 17 | |
| 18 class TestProtocol: | |
| 19 implements(telnet.ITelnetProtocol) | |
| 20 | |
| 21 localEnableable = () | |
| 22 remoteEnableable = () | |
| 23 | |
| 24 def __init__(self): | |
| 25 self.bytes = '' | |
| 26 self.subcmd = '' | |
| 27 self.calls = [] | |
| 28 | |
| 29 self.enabledLocal = [] | |
| 30 self.enabledRemote = [] | |
| 31 self.disabledLocal = [] | |
| 32 self.disabledRemote = [] | |
| 33 | |
| 34 def makeConnection(self, transport): | |
| 35 d = transport.negotiationMap = {} | |
| 36 d['\x12'] = self.neg_TEST_COMMAND | |
| 37 | |
| 38 d = transport.commandMap = transport.commandMap.copy() | |
| 39 for cmd in ('NOP', 'DM', 'BRK', 'IP', 'AO', 'AYT', 'EC', 'EL', 'GA'): | |
| 40 d[getattr(telnet, cmd)] = lambda arg, cmd=cmd: self.calls.append(cmd
) | |
| 41 | |
| 42 def dataReceived(self, bytes): | |
| 43 self.bytes += bytes | |
| 44 | |
| 45 def connectionLost(self, reason): | |
| 46 pass | |
| 47 | |
| 48 def neg_TEST_COMMAND(self, payload): | |
| 49 self.subcmd = payload | |
| 50 | |
| 51 def enableLocal(self, option): | |
| 52 if option in self.localEnableable: | |
| 53 self.enabledLocal.append(option) | |
| 54 return True | |
| 55 return False | |
| 56 | |
| 57 def disableLocal(self, option): | |
| 58 self.disabledLocal.append(option) | |
| 59 | |
| 60 def enableRemote(self, option): | |
| 61 if option in self.remoteEnableable: | |
| 62 self.enabledRemote.append(option) | |
| 63 return True | |
| 64 return False | |
| 65 | |
| 66 def disableRemote(self, option): | |
| 67 self.disabledRemote.append(option) | |
| 68 | |
| 69 | |
| 70 | |
| 71 class TelnetTransportTestCase(unittest.TestCase): | |
| 72 """ | |
| 73 Tests for L{telnet.TelnetTransport}. | |
| 74 """ | |
| 75 def setUp(self): | |
| 76 self.p = telnet.TelnetTransport(TestProtocol) | |
| 77 self.t = proto_helpers.StringTransport() | |
| 78 self.p.makeConnection(self.t) | |
| 79 | |
| 80 def testRegularBytes(self): | |
| 81 # Just send a bunch of bytes. None of these do anything | |
| 82 # with telnet. They should pass right through to the | |
| 83 # application layer. | |
| 84 h = self.p.protocol | |
| 85 | |
| 86 L = ["here are some bytes la la la", | |
| 87 "some more arrive here", | |
| 88 "lots of bytes to play with", | |
| 89 "la la la", | |
| 90 "ta de da", | |
| 91 "dum"] | |
| 92 for b in L: | |
| 93 self.p.dataReceived(b) | |
| 94 | |
| 95 self.assertEquals(h.bytes, ''.join(L)) | |
| 96 | |
| 97 def testNewlineHandling(self): | |
| 98 # Send various kinds of newlines and make sure they get translated | |
| 99 # into \n. | |
| 100 h = self.p.protocol | |
| 101 | |
| 102 L = ["here is the first line\r\n", | |
| 103 "here is the second line\r\0", | |
| 104 "here is the third line\r\n", | |
| 105 "here is the last line\r\0"] | |
| 106 | |
| 107 for b in L: | |
| 108 self.p.dataReceived(b) | |
| 109 | |
| 110 self.assertEquals(h.bytes, L[0][:-2] + '\n' + | |
| 111 L[1][:-2] + '\r' + | |
| 112 L[2][:-2] + '\n' + | |
| 113 L[3][:-2] + '\r') | |
| 114 | |
| 115 def testIACEscape(self): | |
| 116 # Send a bunch of bytes and a couple quoted \xFFs. Unquoted, | |
| 117 # \xFF is a telnet command. Quoted, one of them from each pair | |
| 118 # should be passed through to the application layer. | |
| 119 h = self.p.protocol | |
| 120 | |
| 121 L = ["here are some bytes\xff\xff with an embedded IAC", | |
| 122 "and here is a test of a border escape\xff", | |
| 123 "\xff did you get that IAC?"] | |
| 124 | |
| 125 for b in L: | |
| 126 self.p.dataReceived(b) | |
| 127 | |
| 128 self.assertEquals(h.bytes, ''.join(L).replace('\xff\xff', '\xff')) | |
| 129 | |
| 130 def _simpleCommandTest(self, cmdName): | |
| 131 # Send a single simple telnet command and make sure | |
| 132 # it gets noticed and the appropriate method gets | |
| 133 # called. | |
| 134 h = self.p.protocol | |
| 135 | |
| 136 cmd = telnet.IAC + getattr(telnet, cmdName) | |
| 137 L = ["Here's some bytes, tra la la", | |
| 138 "But ono!" + cmd + " an interrupt"] | |
| 139 | |
| 140 for b in L: | |
| 141 self.p.dataReceived(b) | |
| 142 | |
| 143 self.assertEquals(h.calls, [cmdName]) | |
| 144 self.assertEquals(h.bytes, ''.join(L).replace(cmd, '')) | |
| 145 | |
| 146 def testInterrupt(self): | |
| 147 self._simpleCommandTest("IP") | |
| 148 | |
| 149 def testNoOperation(self): | |
| 150 self._simpleCommandTest("NOP") | |
| 151 | |
| 152 def testDataMark(self): | |
| 153 self._simpleCommandTest("DM") | |
| 154 | |
| 155 def testBreak(self): | |
| 156 self._simpleCommandTest("BRK") | |
| 157 | |
| 158 def testAbortOutput(self): | |
| 159 self._simpleCommandTest("AO") | |
| 160 | |
| 161 def testAreYouThere(self): | |
| 162 self._simpleCommandTest("AYT") | |
| 163 | |
| 164 def testEraseCharacter(self): | |
| 165 self._simpleCommandTest("EC") | |
| 166 | |
| 167 def testEraseLine(self): | |
| 168 self._simpleCommandTest("EL") | |
| 169 | |
| 170 def testGoAhead(self): | |
| 171 self._simpleCommandTest("GA") | |
| 172 | |
| 173 def testSubnegotiation(self): | |
| 174 # Send a subnegotiation command and make sure it gets | |
| 175 # parsed and that the correct method is called. | |
| 176 h = self.p.protocol | |
| 177 | |
| 178 cmd = telnet.IAC + telnet.SB + '\x12hello world' + telnet.IAC + telnet.S
E | |
| 179 L = ["These are some bytes but soon" + cmd, | |
| 180 "there will be some more"] | |
| 181 | |
| 182 for b in L: | |
| 183 self.p.dataReceived(b) | |
| 184 | |
| 185 self.assertEquals(h.bytes, ''.join(L).replace(cmd, '')) | |
| 186 self.assertEquals(h.subcmd, list("hello world")) | |
| 187 | |
| 188 def testSubnegotiationWithEmbeddedSE(self): | |
| 189 # Send a subnegotiation command with an embedded SE. Make sure | |
| 190 # that SE gets passed to the correct method. | |
| 191 h = self.p.protocol | |
| 192 | |
| 193 cmd = (telnet.IAC + telnet.SB + | |
| 194 '\x12' + telnet.SE + | |
| 195 telnet.IAC + telnet.SE) | |
| 196 | |
| 197 L = ["Some bytes are here" + cmd + "and here", | |
| 198 "and here"] | |
| 199 | |
| 200 for b in L: | |
| 201 self.p.dataReceived(b) | |
| 202 | |
| 203 self.assertEquals(h.bytes, ''.join(L).replace(cmd, '')) | |
| 204 self.assertEquals(h.subcmd, [telnet.SE]) | |
| 205 | |
| 206 def testBoundarySubnegotiation(self): | |
| 207 # Send a subnegotiation command. Split it at every possible byte bounda
ry | |
| 208 # and make sure it always gets parsed and that it is passed to the corre
ct | |
| 209 # method. | |
| 210 cmd = (telnet.IAC + telnet.SB + | |
| 211 '\x12' + telnet.SE + 'hello' + | |
| 212 telnet.IAC + telnet.SE) | |
| 213 | |
| 214 for i in range(len(cmd)): | |
| 215 h = self.p.protocol = TestProtocol() | |
| 216 h.makeConnection(self.p) | |
| 217 | |
| 218 a, b = cmd[:i], cmd[i:] | |
| 219 L = ["first part" + a, | |
| 220 b + "last part"] | |
| 221 | |
| 222 for bytes in L: | |
| 223 self.p.dataReceived(bytes) | |
| 224 | |
| 225 self.assertEquals(h.bytes, ''.join(L).replace(cmd, '')) | |
| 226 self.assertEquals(h.subcmd, [telnet.SE] + list('hello')) | |
| 227 | |
| 228 def _enabledHelper(self, o, eL=[], eR=[], dL=[], dR=[]): | |
| 229 self.assertEquals(o.enabledLocal, eL) | |
| 230 self.assertEquals(o.enabledRemote, eR) | |
| 231 self.assertEquals(o.disabledLocal, dL) | |
| 232 self.assertEquals(o.disabledRemote, dR) | |
| 233 | |
| 234 def testRefuseWill(self): | |
| 235 # Try to enable an option. The server should refuse to enable it. | |
| 236 cmd = telnet.IAC + telnet.WILL + '\x12' | |
| 237 | |
| 238 bytes = "surrounding bytes" + cmd + "to spice things up" | |
| 239 self.p.dataReceived(bytes) | |
| 240 | |
| 241 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
| 242 self.assertEquals(self.t.value(), telnet.IAC + telnet.DONT + '\x12') | |
| 243 self._enabledHelper(self.p.protocol) | |
| 244 | |
| 245 def testRefuseDo(self): | |
| 246 # Try to enable an option. The server should refuse to enable it. | |
| 247 cmd = telnet.IAC + telnet.DO + '\x12' | |
| 248 | |
| 249 bytes = "surrounding bytes" + cmd + "to spice things up" | |
| 250 self.p.dataReceived(bytes) | |
| 251 | |
| 252 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
| 253 self.assertEquals(self.t.value(), telnet.IAC + telnet.WONT + '\x12') | |
| 254 self._enabledHelper(self.p.protocol) | |
| 255 | |
| 256 def testAcceptDo(self): | |
| 257 # Try to enable an option. The option is in our allowEnable | |
| 258 # list, so we will allow it to be enabled. | |
| 259 cmd = telnet.IAC + telnet.DO + '\x19' | |
| 260 bytes = 'padding' + cmd + 'trailer' | |
| 261 | |
| 262 h = self.p.protocol | |
| 263 h.localEnableable = ('\x19',) | |
| 264 self.p.dataReceived(bytes) | |
| 265 | |
| 266 self.assertEquals(self.t.value(), telnet.IAC + telnet.WILL + '\x19') | |
| 267 self._enabledHelper(h, eL=['\x19']) | |
| 268 | |
| 269 def testAcceptWill(self): | |
| 270 # Same as testAcceptDo, but reversed. | |
| 271 cmd = telnet.IAC + telnet.WILL + '\x91' | |
| 272 bytes = 'header' + cmd + 'padding' | |
| 273 | |
| 274 h = self.p.protocol | |
| 275 h.remoteEnableable = ('\x91',) | |
| 276 self.p.dataReceived(bytes) | |
| 277 | |
| 278 self.assertEquals(self.t.value(), telnet.IAC + telnet.DO + '\x91') | |
| 279 self._enabledHelper(h, eR=['\x91']) | |
| 280 | |
| 281 def testAcceptWont(self): | |
| 282 # Try to disable an option. The server must allow any option to | |
| 283 # be disabled at any time. Make sure it disables it and sends | |
| 284 # back an acknowledgement of this. | |
| 285 cmd = telnet.IAC + telnet.WONT + '\x29' | |
| 286 | |
| 287 # Jimmy it - after these two lines, the server will be in a state | |
| 288 # such that it believes the option to have been previously enabled | |
| 289 # via normal negotiation. | |
| 290 s = self.p.getOptionState('\x29') | |
| 291 s.him.state = 'yes' | |
| 292 | |
| 293 bytes = "fiddle dee" + cmd | |
| 294 self.p.dataReceived(bytes) | |
| 295 | |
| 296 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
| 297 self.assertEquals(self.t.value(), telnet.IAC + telnet.DONT + '\x29') | |
| 298 self.assertEquals(s.him.state, 'no') | |
| 299 self._enabledHelper(self.p.protocol, dR=['\x29']) | |
| 300 | |
| 301 def testAcceptDont(self): | |
| 302 # Try to disable an option. The server must allow any option to | |
| 303 # be disabled at any time. Make sure it disables it and sends | |
| 304 # back an acknowledgement of this. | |
| 305 cmd = telnet.IAC + telnet.DONT + '\x29' | |
| 306 | |
| 307 # Jimmy it - after these two lines, the server will be in a state | |
| 308 # such that it believes the option to have beenp previously enabled | |
| 309 # via normal negotiation. | |
| 310 s = self.p.getOptionState('\x29') | |
| 311 s.us.state = 'yes' | |
| 312 | |
| 313 bytes = "fiddle dum " + cmd | |
| 314 self.p.dataReceived(bytes) | |
| 315 | |
| 316 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
| 317 self.assertEquals(self.t.value(), telnet.IAC + telnet.WONT + '\x29') | |
| 318 self.assertEquals(s.us.state, 'no') | |
| 319 self._enabledHelper(self.p.protocol, dL=['\x29']) | |
| 320 | |
| 321 def testIgnoreWont(self): | |
| 322 # Try to disable an option. The option is already disabled. The | |
| 323 # server should send nothing in response to this. | |
| 324 cmd = telnet.IAC + telnet.WONT + '\x47' | |
| 325 | |
| 326 bytes = "dum de dum" + cmd + "tra la la" | |
| 327 self.p.dataReceived(bytes) | |
| 328 | |
| 329 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
| 330 self.assertEquals(self.t.value(), '') | |
| 331 self._enabledHelper(self.p.protocol) | |
| 332 | |
| 333 def testIgnoreDont(self): | |
| 334 # Try to disable an option. The option is already disabled. The | |
| 335 # server should send nothing in response to this. Doing so could | |
| 336 # lead to a negotiation loop. | |
| 337 cmd = telnet.IAC + telnet.DONT + '\x47' | |
| 338 | |
| 339 bytes = "dum de dum" + cmd + "tra la la" | |
| 340 self.p.dataReceived(bytes) | |
| 341 | |
| 342 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
| 343 self.assertEquals(self.t.value(), '') | |
| 344 self._enabledHelper(self.p.protocol) | |
| 345 | |
| 346 def testIgnoreWill(self): | |
| 347 # Try to enable an option. The option is already enabled. The | |
| 348 # server should send nothing in response to this. Doing so could | |
| 349 # lead to a negotiation loop. | |
| 350 cmd = telnet.IAC + telnet.WILL + '\x56' | |
| 351 | |
| 352 # Jimmy it - after these two lines, the server will be in a state | |
| 353 # such that it believes the option to have been previously enabled | |
| 354 # via normal negotiation. | |
| 355 s = self.p.getOptionState('\x56') | |
| 356 s.him.state = 'yes' | |
| 357 | |
| 358 bytes = "tra la la" + cmd + "dum de dum" | |
| 359 self.p.dataReceived(bytes) | |
| 360 | |
| 361 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
| 362 self.assertEquals(self.t.value(), '') | |
| 363 self._enabledHelper(self.p.protocol) | |
| 364 | |
| 365 def testIgnoreDo(self): | |
| 366 # Try to enable an option. The option is already enabled. The | |
| 367 # server should send nothing in response to this. Doing so could | |
| 368 # lead to a negotiation loop. | |
| 369 cmd = telnet.IAC + telnet.DO + '\x56' | |
| 370 | |
| 371 # Jimmy it - after these two lines, the server will be in a state | |
| 372 # such that it believes the option to have been previously enabled | |
| 373 # via normal negotiation. | |
| 374 s = self.p.getOptionState('\x56') | |
| 375 s.us.state = 'yes' | |
| 376 | |
| 377 bytes = "tra la la" + cmd + "dum de dum" | |
| 378 self.p.dataReceived(bytes) | |
| 379 | |
| 380 self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, '')) | |
| 381 self.assertEquals(self.t.value(), '') | |
| 382 self._enabledHelper(self.p.protocol) | |
| 383 | |
| 384 def testAcceptedEnableRequest(self): | |
| 385 # Try to enable an option through the user-level API. This | |
| 386 # returns a Deferred that fires when negotiation about the option | |
| 387 # finishes. Make sure it fires, make sure state gets updated | |
| 388 # properly, make sure the result indicates the option was enabled. | |
| 389 d = self.p.do('\x42') | |
| 390 | |
| 391 h = self.p.protocol | |
| 392 h.remoteEnableable = ('\x42',) | |
| 393 | |
| 394 self.assertEquals(self.t.value(), telnet.IAC + telnet.DO + '\x42') | |
| 395 | |
| 396 self.p.dataReceived(telnet.IAC + telnet.WILL + '\x42') | |
| 397 | |
| 398 d.addCallback(self.assertEquals, True) | |
| 399 d.addCallback(lambda _: self._enabledHelper(h, eR=['\x42'])) | |
| 400 return d | |
| 401 | |
| 402 def testRefusedEnableRequest(self): | |
| 403 # Try to enable an option through the user-level API. This | |
| 404 # returns a Deferred that fires when negotiation about the option | |
| 405 # finishes. Make sure it fires, make sure state gets updated | |
| 406 # properly, make sure the result indicates the option was enabled. | |
| 407 d = self.p.do('\x42') | |
| 408 | |
| 409 self.assertEquals(self.t.value(), telnet.IAC + telnet.DO + '\x42') | |
| 410 | |
| 411 self.p.dataReceived(telnet.IAC + telnet.WONT + '\x42') | |
| 412 | |
| 413 d = self.assertFailure(d, telnet.OptionRefused) | |
| 414 d.addCallback(lambda _: self._enabledHelper(self.p.protocol)) | |
| 415 return d | |
| 416 | |
| 417 def testAcceptedDisableRequest(self): | |
| 418 # Try to disable an option through the user-level API. This | |
| 419 # returns a Deferred that fires when negotiation about the option | |
| 420 # finishes. Make sure it fires, make sure state gets updated | |
| 421 # properly, make sure the result indicates the option was enabled. | |
| 422 s = self.p.getOptionState('\x42') | |
| 423 s.him.state = 'yes' | |
| 424 | |
| 425 d = self.p.dont('\x42') | |
| 426 | |
| 427 self.assertEquals(self.t.value(), telnet.IAC + telnet.DONT + '\x42') | |
| 428 | |
| 429 self.p.dataReceived(telnet.IAC + telnet.WONT + '\x42') | |
| 430 | |
| 431 d.addCallback(self.assertEquals, True) | |
| 432 d.addCallback(lambda _: self._enabledHelper(self.p.protocol, | |
| 433 dR=['\x42'])) | |
| 434 return d | |
| 435 | |
| 436 def testNegotiationBlocksFurtherNegotiation(self): | |
| 437 # Try to disable an option, then immediately try to enable it, then | |
| 438 # immediately try to disable it. Ensure that the 2nd and 3rd calls | |
| 439 # fail quickly with the right exception. | |
| 440 s = self.p.getOptionState('\x24') | |
| 441 s.him.state = 'yes' | |
| 442 d2 = self.p.dont('\x24') # fires after the first line of _final | |
| 443 | |
| 444 def _do(x): | |
| 445 d = self.p.do('\x24') | |
| 446 return self.assertFailure(d, telnet.AlreadyNegotiating) | |
| 447 | |
| 448 def _dont(x): | |
| 449 d = self.p.dont('\x24') | |
| 450 return self.assertFailure(d, telnet.AlreadyNegotiating) | |
| 451 | |
| 452 def _final(x): | |
| 453 self.p.dataReceived(telnet.IAC + telnet.WONT + '\x24') | |
| 454 # an assertion that only passes if d2 has fired | |
| 455 self._enabledHelper(self.p.protocol, dR=['\x24']) | |
| 456 # Make sure we allow this | |
| 457 self.p.protocol.remoteEnableable = ('\x24',) | |
| 458 d = self.p.do('\x24') | |
| 459 self.p.dataReceived(telnet.IAC + telnet.WILL + '\x24') | |
| 460 d.addCallback(self.assertEquals, True) | |
| 461 d.addCallback(lambda _: self._enabledHelper(self.p.protocol, | |
| 462 eR=['\x24'], | |
| 463 dR=['\x24'])) | |
| 464 return d | |
| 465 | |
| 466 d = _do(None) | |
| 467 d.addCallback(_dont) | |
| 468 d.addCallback(_final) | |
| 469 return d | |
| 470 | |
| 471 def testSuperfluousDisableRequestRaises(self): | |
| 472 # Try to disable a disabled option. Make sure it fails properly. | |
| 473 d = self.p.dont('\xab') | |
| 474 return self.assertFailure(d, telnet.AlreadyDisabled) | |
| 475 | |
| 476 def testSuperfluousEnableRequestRaises(self): | |
| 477 # Try to disable a disabled option. Make sure it fails properly. | |
| 478 s = self.p.getOptionState('\xab') | |
| 479 s.him.state = 'yes' | |
| 480 d = self.p.do('\xab') | |
| 481 return self.assertFailure(d, telnet.AlreadyEnabled) | |
| 482 | |
| 483 def testLostConnectionFailsDeferreds(self): | |
| 484 d1 = self.p.do('\x12') | |
| 485 d2 = self.p.do('\x23') | |
| 486 d3 = self.p.do('\x34') | |
| 487 | |
| 488 class TestException(Exception): | |
| 489 pass | |
| 490 | |
| 491 self.p.connectionLost(TestException("Total failure!")) | |
| 492 | |
| 493 d1 = self.assertFailure(d1, TestException) | |
| 494 d2 = self.assertFailure(d2, TestException) | |
| 495 d3 = self.assertFailure(d3, TestException) | |
| 496 return defer.gatherResults([d1, d2, d3]) | |
| 497 | |
| 498 | |
| 499 class TestTelnet(telnet.Telnet): | |
| 500 """ | |
| 501 A trivial extension of the telnet protocol class useful to unit tests. | |
| 502 """ | |
| 503 def __init__(self): | |
| 504 telnet.Telnet.__init__(self) | |
| 505 self.events = [] | |
| 506 | |
| 507 | |
| 508 def applicationDataReceived(self, bytes): | |
| 509 """ | |
| 510 Record the given data in C{self.events}. | |
| 511 """ | |
| 512 self.events.append(('bytes', bytes)) | |
| 513 | |
| 514 | |
| 515 def unhandledCommand(self, command, bytes): | |
| 516 """ | |
| 517 Record the given command in C{self.events}. | |
| 518 """ | |
| 519 self.events.append(('command', command, bytes)) | |
| 520 | |
| 521 | |
| 522 def unhandledSubnegotiation(self, command, bytes): | |
| 523 """ | |
| 524 Record the given subnegotiation command in C{self.events}. | |
| 525 """ | |
| 526 self.events.append(('negotiate', command, bytes)) | |
| 527 | |
| 528 | |
| 529 | |
| 530 class TelnetTests(unittest.TestCase): | |
| 531 """ | |
| 532 Tests for L{telnet.Telnet}. | |
| 533 | |
| 534 L{telnet.Telnet} implements the TELNET protocol (RFC 854), including option | |
| 535 and suboption negotiation, and option state tracking. | |
| 536 """ | |
| 537 def setUp(self): | |
| 538 """ | |
| 539 Create an unconnected L{telnet.Telnet} to be used by tests. | |
| 540 """ | |
| 541 self.protocol = TestTelnet() | |
| 542 | |
| 543 | |
| 544 def test_enableLocal(self): | |
| 545 """ | |
| 546 L{telnet.Telnet.enableLocal} should reject all options, since | |
| 547 L{telnet.Telnet} does not know how to implement any options. | |
| 548 """ | |
| 549 self.assertFalse(self.protocol.enableLocal('\0')) | |
| 550 | |
| 551 | |
| 552 def test_enableRemote(self): | |
| 553 """ | |
| 554 L{telnet.Telnet.enableRemote} should reject all options, since | |
| 555 L{telnet.Telnet} does not know how to implement any options. | |
| 556 """ | |
| 557 self.assertFalse(self.protocol.enableRemote('\0')) | |
| 558 | |
| 559 | |
| 560 def test_disableLocal(self): | |
| 561 """ | |
| 562 It is an error for L{telnet.Telnet.disableLocal} to be called, since | |
| 563 L{telnet.Telnet.enableLocal} will never allow any options to be enabled | |
| 564 locally. If a subclass overrides enableLocal, it must also override | |
| 565 disableLocal. | |
| 566 """ | |
| 567 self.assertRaises(NotImplementedError, self.protocol.disableLocal, '\0') | |
| 568 | |
| 569 | |
| 570 def test_disableRemote(self): | |
| 571 """ | |
| 572 It is an error for L{telnet.Telnet.disableRemote} to be called, since | |
| 573 L{telnet.Telnet.enableRemote} will never allow any options to be | |
| 574 enabled remotely. If a subclass overrides enableRemote, it must also | |
| 575 override disableRemote. | |
| 576 """ | |
| 577 self.assertRaises(NotImplementedError, self.protocol.disableRemote, '\0'
) | |
| 578 | |
| 579 | |
| 580 def _deliver(self, bytes, *expected): | |
| 581 """ | |
| 582 Pass the given bytes to the protocol's C{dataReceived} method and | |
| 583 assert that the given events occur. | |
| 584 """ | |
| 585 received = self.protocol.events = [] | |
| 586 self.protocol.dataReceived(bytes) | |
| 587 self.assertEqual(received, list(expected)) | |
| 588 | |
| 589 | |
| 590 def test_oneApplicationDataByte(self): | |
| 591 """ | |
| 592 One application-data byte in the default state gets delivered right | |
| 593 away. | |
| 594 """ | |
| 595 self._deliver('a', ('bytes', 'a')) | |
| 596 | |
| 597 | |
| 598 def test_twoApplicationDataBytes(self): | |
| 599 """ | |
| 600 Two application-data bytes in the default state get delivered | |
| 601 together. | |
| 602 """ | |
| 603 self._deliver('bc', ('bytes', 'bc')) | |
| 604 | |
| 605 | |
| 606 def test_threeApplicationDataBytes(self): | |
| 607 """ | |
| 608 Three application-data bytes followed by a control byte get | |
| 609 delivered, but the control byte doesn't. | |
| 610 """ | |
| 611 self._deliver('def' + telnet.IAC, ('bytes', 'def')) | |
| 612 | |
| 613 | |
| 614 def test_escapedControl(self): | |
| 615 """ | |
| 616 IAC in the escaped state gets delivered and so does another | |
| 617 application-data byte following it. | |
| 618 """ | |
| 619 self._deliver(telnet.IAC) | |
| 620 self._deliver(telnet.IAC + 'g', ('bytes', telnet.IAC + 'g')) | |
| 621 | |
| 622 | |
| 623 def test_carriageReturn(self): | |
| 624 """ | |
| 625 A carriage return only puts the protocol into the newline state. A | |
| 626 linefeed in the newline state causes just the newline to be | |
| 627 delivered. A nul in the newline state causes a carriage return to | |
| 628 be delivered. Anything else causes a carriage return and that thing | |
| 629 to be delivered. | |
| 630 """ | |
| 631 self._deliver('\r') | |
| 632 self._deliver('\n', ('bytes', '\n')) | |
| 633 self._deliver('\r\n', ('bytes', '\n')) | |
| 634 | |
| 635 self._deliver('\r') | |
| 636 self._deliver('\0', ('bytes', '\r')) | |
| 637 self._deliver('\r\0', ('bytes', '\r')) | |
| 638 | |
| 639 self._deliver('\r') | |
| 640 self._deliver('a', ('bytes', '\ra')) | |
| 641 self._deliver('\ra', ('bytes', '\ra')) | |
| 642 | |
| 643 | |
| 644 def test_applicationDataBeforeSimpleCommand(self): | |
| 645 """ | |
| 646 Application bytes received before a command are delivered before the | |
| 647 command is processed. | |
| 648 """ | |
| 649 self._deliver( | |
| 650 'x' + telnet.IAC + telnet.NOP, | |
| 651 ('bytes', 'x'), ('command', telnet.NOP, None)) | |
| 652 | |
| 653 | |
| 654 def test_applicationDataBeforeCommand(self): | |
| 655 """ | |
| 656 Application bytes received before a WILL/WONT/DO/DONT are delivered | |
| 657 before the command is processed. | |
| 658 """ | |
| 659 self.protocol.commandMap = {} | |
| 660 self._deliver( | |
| 661 'y' + telnet.IAC + telnet.WILL + '\x00', | |
| 662 ('bytes', 'y'), ('command', telnet.WILL, '\x00')) | |
| 663 | |
| 664 | |
| 665 def test_applicationDataBeforeSubnegotiation(self): | |
| 666 """ | |
| 667 Application bytes received before a subnegotiation command are | |
| 668 delivered before the negotiation is processed. | |
| 669 """ | |
| 670 self._deliver( | |
| 671 'z' + telnet.IAC + telnet.SB + 'Qx' + telnet.IAC + telnet.SE, | |
| 672 ('bytes', 'z'), ('negotiate', 'Q', ['x'])) | |
| OLD | NEW |