| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.conch.test.test_recvline -*- | |
| 2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories. | |
| 3 # See LICENSE for details. | |
| 4 | |
| 5 """ | |
| 6 Tests for L{twisted.conch.recvline} and fixtures for testing related | |
| 7 functionality. | |
| 8 """ | |
| 9 | |
| 10 import sys, os | |
| 11 | |
| 12 from twisted.conch.insults import insults | |
| 13 from twisted.conch import recvline | |
| 14 | |
| 15 from twisted.python import reflect, components | |
| 16 from twisted.internet import defer, error | |
| 17 from twisted.trial import unittest | |
| 18 from twisted.cred import portal | |
| 19 from twisted.test.proto_helpers import StringTransport | |
| 20 | |
| 21 class Arrows(unittest.TestCase): | |
| 22 def setUp(self): | |
| 23 self.underlyingTransport = StringTransport() | |
| 24 self.pt = insults.ServerProtocol() | |
| 25 self.p = recvline.HistoricRecvLine() | |
| 26 self.pt.protocolFactory = lambda: self.p | |
| 27 self.pt.factory = self | |
| 28 self.pt.makeConnection(self.underlyingTransport) | |
| 29 # self.p.makeConnection(self.pt) | |
| 30 | |
| 31 def testPrintableCharacters(self): | |
| 32 self.p.keystrokeReceived('x', None) | |
| 33 self.p.keystrokeReceived('y', None) | |
| 34 self.p.keystrokeReceived('z', None) | |
| 35 | |
| 36 self.assertEquals(self.p.currentLineBuffer(), ('xyz', '')) | |
| 37 | |
| 38 def testHorizontalArrows(self): | |
| 39 kR = lambda ch: self.p.keystrokeReceived(ch, None) | |
| 40 for ch in 'xyz': | |
| 41 kR(ch) | |
| 42 | |
| 43 self.assertEquals(self.p.currentLineBuffer(), ('xyz', '')) | |
| 44 | |
| 45 kR(self.pt.RIGHT_ARROW) | |
| 46 self.assertEquals(self.p.currentLineBuffer(), ('xyz', '')) | |
| 47 | |
| 48 kR(self.pt.LEFT_ARROW) | |
| 49 self.assertEquals(self.p.currentLineBuffer(), ('xy', 'z')) | |
| 50 | |
| 51 kR(self.pt.LEFT_ARROW) | |
| 52 self.assertEquals(self.p.currentLineBuffer(), ('x', 'yz')) | |
| 53 | |
| 54 kR(self.pt.LEFT_ARROW) | |
| 55 self.assertEquals(self.p.currentLineBuffer(), ('', 'xyz')) | |
| 56 | |
| 57 kR(self.pt.LEFT_ARROW) | |
| 58 self.assertEquals(self.p.currentLineBuffer(), ('', 'xyz')) | |
| 59 | |
| 60 kR(self.pt.RIGHT_ARROW) | |
| 61 self.assertEquals(self.p.currentLineBuffer(), ('x', 'yz')) | |
| 62 | |
| 63 kR(self.pt.RIGHT_ARROW) | |
| 64 self.assertEquals(self.p.currentLineBuffer(), ('xy', 'z')) | |
| 65 | |
| 66 kR(self.pt.RIGHT_ARROW) | |
| 67 self.assertEquals(self.p.currentLineBuffer(), ('xyz', '')) | |
| 68 | |
| 69 kR(self.pt.RIGHT_ARROW) | |
| 70 self.assertEquals(self.p.currentLineBuffer(), ('xyz', '')) | |
| 71 | |
| 72 def testNewline(self): | |
| 73 kR = lambda ch: self.p.keystrokeReceived(ch, None) | |
| 74 | |
| 75 for ch in 'xyz\nabc\n123\n': | |
| 76 kR(ch) | |
| 77 | |
| 78 self.assertEquals(self.p.currentHistoryBuffer(), | |
| 79 (('xyz', 'abc', '123'), ())) | |
| 80 | |
| 81 kR('c') | |
| 82 kR('b') | |
| 83 kR('a') | |
| 84 self.assertEquals(self.p.currentHistoryBuffer(), | |
| 85 (('xyz', 'abc', '123'), ())) | |
| 86 | |
| 87 kR('\n') | |
| 88 self.assertEquals(self.p.currentHistoryBuffer(), | |
| 89 (('xyz', 'abc', '123', 'cba'), ())) | |
| 90 | |
| 91 def testVerticalArrows(self): | |
| 92 kR = lambda ch: self.p.keystrokeReceived(ch, None) | |
| 93 | |
| 94 for ch in 'xyz\nabc\n123\n': | |
| 95 kR(ch) | |
| 96 | |
| 97 self.assertEquals(self.p.currentHistoryBuffer(), | |
| 98 (('xyz', 'abc', '123'), ())) | |
| 99 self.assertEquals(self.p.currentLineBuffer(), ('', '')) | |
| 100 | |
| 101 kR(self.pt.UP_ARROW) | |
| 102 self.assertEquals(self.p.currentHistoryBuffer(), | |
| 103 (('xyz', 'abc'), ('123',))) | |
| 104 self.assertEquals(self.p.currentLineBuffer(), ('123', '')) | |
| 105 | |
| 106 kR(self.pt.UP_ARROW) | |
| 107 self.assertEquals(self.p.currentHistoryBuffer(), | |
| 108 (('xyz',), ('abc', '123'))) | |
| 109 self.assertEquals(self.p.currentLineBuffer(), ('abc', '')) | |
| 110 | |
| 111 kR(self.pt.UP_ARROW) | |
| 112 self.assertEquals(self.p.currentHistoryBuffer(), | |
| 113 ((), ('xyz', 'abc', '123'))) | |
| 114 self.assertEquals(self.p.currentLineBuffer(), ('xyz', '')) | |
| 115 | |
| 116 kR(self.pt.UP_ARROW) | |
| 117 self.assertEquals(self.p.currentHistoryBuffer(), | |
| 118 ((), ('xyz', 'abc', '123'))) | |
| 119 self.assertEquals(self.p.currentLineBuffer(), ('xyz', '')) | |
| 120 | |
| 121 for i in range(4): | |
| 122 kR(self.pt.DOWN_ARROW) | |
| 123 self.assertEquals(self.p.currentHistoryBuffer(), | |
| 124 (('xyz', 'abc', '123'), ())) | |
| 125 | |
| 126 def testHome(self): | |
| 127 kR = lambda ch: self.p.keystrokeReceived(ch, None) | |
| 128 | |
| 129 for ch in 'hello, world': | |
| 130 kR(ch) | |
| 131 self.assertEquals(self.p.currentLineBuffer(), ('hello, world', '')) | |
| 132 | |
| 133 kR(self.pt.HOME) | |
| 134 self.assertEquals(self.p.currentLineBuffer(), ('', 'hello, world')) | |
| 135 | |
| 136 def testEnd(self): | |
| 137 kR = lambda ch: self.p.keystrokeReceived(ch, None) | |
| 138 | |
| 139 for ch in 'hello, world': | |
| 140 kR(ch) | |
| 141 self.assertEquals(self.p.currentLineBuffer(), ('hello, world', '')) | |
| 142 | |
| 143 kR(self.pt.HOME) | |
| 144 kR(self.pt.END) | |
| 145 self.assertEquals(self.p.currentLineBuffer(), ('hello, world', '')) | |
| 146 | |
| 147 def testBackspace(self): | |
| 148 kR = lambda ch: self.p.keystrokeReceived(ch, None) | |
| 149 | |
| 150 for ch in 'xyz': | |
| 151 kR(ch) | |
| 152 self.assertEquals(self.p.currentLineBuffer(), ('xyz', '')) | |
| 153 | |
| 154 kR(self.pt.BACKSPACE) | |
| 155 self.assertEquals(self.p.currentLineBuffer(), ('xy', '')) | |
| 156 | |
| 157 kR(self.pt.LEFT_ARROW) | |
| 158 kR(self.pt.BACKSPACE) | |
| 159 self.assertEquals(self.p.currentLineBuffer(), ('', 'y')) | |
| 160 | |
| 161 kR(self.pt.BACKSPACE) | |
| 162 self.assertEquals(self.p.currentLineBuffer(), ('', 'y')) | |
| 163 | |
| 164 def testDelete(self): | |
| 165 kR = lambda ch: self.p.keystrokeReceived(ch, None) | |
| 166 | |
| 167 for ch in 'xyz': | |
| 168 kR(ch) | |
| 169 self.assertEquals(self.p.currentLineBuffer(), ('xyz', '')) | |
| 170 | |
| 171 kR(self.pt.DELETE) | |
| 172 self.assertEquals(self.p.currentLineBuffer(), ('xyz', '')) | |
| 173 | |
| 174 kR(self.pt.LEFT_ARROW) | |
| 175 kR(self.pt.DELETE) | |
| 176 self.assertEquals(self.p.currentLineBuffer(), ('xy', '')) | |
| 177 | |
| 178 kR(self.pt.LEFT_ARROW) | |
| 179 kR(self.pt.DELETE) | |
| 180 self.assertEquals(self.p.currentLineBuffer(), ('x', '')) | |
| 181 | |
| 182 kR(self.pt.LEFT_ARROW) | |
| 183 kR(self.pt.DELETE) | |
| 184 self.assertEquals(self.p.currentLineBuffer(), ('', '')) | |
| 185 | |
| 186 kR(self.pt.DELETE) | |
| 187 self.assertEquals(self.p.currentLineBuffer(), ('', '')) | |
| 188 | |
| 189 def testInsert(self): | |
| 190 kR = lambda ch: self.p.keystrokeReceived(ch, None) | |
| 191 | |
| 192 for ch in 'xyz': | |
| 193 kR(ch) | |
| 194 | |
| 195 # kR(self.pt.INSERT) | |
| 196 | |
| 197 kR(self.pt.LEFT_ARROW) | |
| 198 kR('A') | |
| 199 self.assertEquals(self.p.currentLineBuffer(), ('xyA', 'z')) | |
| 200 | |
| 201 kR(self.pt.LEFT_ARROW) | |
| 202 kR('B') | |
| 203 self.assertEquals(self.p.currentLineBuffer(), ('xyB', 'Az')) | |
| 204 | |
| 205 def testTypeover(self): | |
| 206 kR = lambda ch: self.p.keystrokeReceived(ch, None) | |
| 207 | |
| 208 for ch in 'xyz': | |
| 209 kR(ch) | |
| 210 | |
| 211 kR(self.pt.INSERT) | |
| 212 | |
| 213 kR(self.pt.LEFT_ARROW) | |
| 214 kR('A') | |
| 215 self.assertEquals(self.p.currentLineBuffer(), ('xyA', '')) | |
| 216 | |
| 217 kR(self.pt.LEFT_ARROW) | |
| 218 kR('B') | |
| 219 self.assertEquals(self.p.currentLineBuffer(), ('xyB', '')) | |
| 220 | |
| 221 | |
| 222 from twisted.conch import telnet | |
| 223 from twisted.conch.insults import helper | |
| 224 from twisted.protocols import loopback | |
| 225 | |
| 226 class EchoServer(recvline.HistoricRecvLine): | |
| 227 def lineReceived(self, line): | |
| 228 self.terminal.write(line + '\n' + self.ps[self.pn]) | |
| 229 | |
| 230 # An insults API for this would be nice. | |
| 231 left = "\x1b[D" | |
| 232 right = "\x1b[C" | |
| 233 up = "\x1b[A" | |
| 234 down = "\x1b[B" | |
| 235 insert = "\x1b[2~" | |
| 236 home = "\x1b[1~" | |
| 237 delete = "\x1b[3~" | |
| 238 end = "\x1b[4~" | |
| 239 backspace = "\x7f" | |
| 240 | |
| 241 from twisted.cred import checkers | |
| 242 | |
| 243 try: | |
| 244 from twisted.conch.ssh import userauth, transport, channel, connection, sess
ion | |
| 245 from twisted.conch.manhole_ssh import TerminalUser, TerminalSession, Termina
lRealm, TerminalSessionTransport, ConchFactory | |
| 246 except ImportError: | |
| 247 ssh = False | |
| 248 else: | |
| 249 ssh = True | |
| 250 class SessionChannel(channel.SSHChannel): | |
| 251 name = 'session' | |
| 252 | |
| 253 def __init__(self, protocolFactory, protocolArgs, protocolKwArgs, width,
height, *a, **kw): | |
| 254 channel.SSHChannel.__init__(self, *a, **kw) | |
| 255 | |
| 256 self.protocolFactory = protocolFactory | |
| 257 self.protocolArgs = protocolArgs | |
| 258 self.protocolKwArgs = protocolKwArgs | |
| 259 | |
| 260 self.width = width | |
| 261 self.height = height | |
| 262 | |
| 263 def channelOpen(self, data): | |
| 264 term = session.packRequest_pty_req("vt102", (self.height, self.width
, 0, 0), '') | |
| 265 self.conn.sendRequest(self, 'pty-req', term) | |
| 266 self.conn.sendRequest(self, 'shell', '') | |
| 267 | |
| 268 self._protocolInstance = self.protocolFactory(*self.protocolArgs, **
self.protocolKwArgs) | |
| 269 self._protocolInstance.factory = self | |
| 270 self._protocolInstance.makeConnection(self) | |
| 271 | |
| 272 def closed(self): | |
| 273 self._protocolInstance.connectionLost(error.ConnectionDone()) | |
| 274 | |
| 275 def dataReceived(self, data): | |
| 276 self._protocolInstance.dataReceived(data) | |
| 277 | |
| 278 class TestConnection(connection.SSHConnection): | |
| 279 def __init__(self, protocolFactory, protocolArgs, protocolKwArgs, width,
height, *a, **kw): | |
| 280 connection.SSHConnection.__init__(self, *a, **kw) | |
| 281 | |
| 282 self.protocolFactory = protocolFactory | |
| 283 self.protocolArgs = protocolArgs | |
| 284 self.protocolKwArgs = protocolKwArgs | |
| 285 | |
| 286 self.width = width | |
| 287 self.height = height | |
| 288 | |
| 289 def serviceStarted(self): | |
| 290 self.__channel = SessionChannel(self.protocolFactory, self.protocolA
rgs, self.protocolKwArgs, self.width, self.height) | |
| 291 self.openChannel(self.__channel) | |
| 292 | |
| 293 def write(self, bytes): | |
| 294 return self.__channel.write(bytes) | |
| 295 | |
| 296 class TestAuth(userauth.SSHUserAuthClient): | |
| 297 def __init__(self, username, password, *a, **kw): | |
| 298 userauth.SSHUserAuthClient.__init__(self, username, *a, **kw) | |
| 299 self.password = password | |
| 300 | |
| 301 def getPassword(self): | |
| 302 return defer.succeed(self.password) | |
| 303 | |
| 304 class TestTransport(transport.SSHClientTransport): | |
| 305 def __init__(self, protocolFactory, protocolArgs, protocolKwArgs, userna
me, password, width, height, *a, **kw): | |
| 306 # transport.SSHClientTransport.__init__(self, *a, **kw) | |
| 307 self.protocolFactory = protocolFactory | |
| 308 self.protocolArgs = protocolArgs | |
| 309 self.protocolKwArgs = protocolKwArgs | |
| 310 self.username = username | |
| 311 self.password = password | |
| 312 self.width = width | |
| 313 self.height = height | |
| 314 | |
| 315 def verifyHostKey(self, hostKey, fingerprint): | |
| 316 return defer.succeed(True) | |
| 317 | |
| 318 def connectionSecure(self): | |
| 319 self.__connection = TestConnection(self.protocolFactory, self.protoc
olArgs, self.protocolKwArgs, self.width, self.height) | |
| 320 self.requestService( | |
| 321 TestAuth(self.username, self.password, self.__connection)) | |
| 322 | |
| 323 def write(self, bytes): | |
| 324 return self.__connection.write(bytes) | |
| 325 | |
| 326 class TestSessionTransport(TerminalSessionTransport): | |
| 327 def protocolFactory(self): | |
| 328 return self.avatar.conn.transport.factory.serverProtocol() | |
| 329 | |
| 330 class TestSession(TerminalSession): | |
| 331 transportFactory = TestSessionTransport | |
| 332 | |
| 333 class TestUser(TerminalUser): | |
| 334 pass | |
| 335 | |
| 336 components.registerAdapter(TestSession, TestUser, session.ISession) | |
| 337 | |
| 338 | |
| 339 class LoopbackRelay(loopback.LoopbackRelay): | |
| 340 clearCall = None | |
| 341 | |
| 342 def logPrefix(self): | |
| 343 return "LoopbackRelay(%r)" % (self.target.__class__.__name__,) | |
| 344 | |
| 345 def write(self, bytes): | |
| 346 loopback.LoopbackRelay.write(self, bytes) | |
| 347 if self.clearCall is not None: | |
| 348 self.clearCall.cancel() | |
| 349 | |
| 350 from twisted.internet import reactor | |
| 351 self.clearCall = reactor.callLater(0, self._clearBuffer) | |
| 352 | |
| 353 def _clearBuffer(self): | |
| 354 self.clearCall = None | |
| 355 loopback.LoopbackRelay.clearBuffer(self) | |
| 356 | |
| 357 | |
| 358 class NotifyingExpectableBuffer(helper.ExpectableBuffer): | |
| 359 def __init__(self): | |
| 360 self.onConnection = defer.Deferred() | |
| 361 self.onDisconnection = defer.Deferred() | |
| 362 | |
| 363 def connectionMade(self): | |
| 364 helper.ExpectableBuffer.connectionMade(self) | |
| 365 self.onConnection.callback(self) | |
| 366 | |
| 367 def connectionLost(self, reason): | |
| 368 self.onDisconnection.errback(reason) | |
| 369 | |
| 370 | |
| 371 class _BaseMixin: | |
| 372 WIDTH = 80 | |
| 373 HEIGHT = 24 | |
| 374 | |
| 375 def _assertBuffer(self, lines): | |
| 376 receivedLines = str(self.recvlineClient).splitlines() | |
| 377 expectedLines = lines + ([''] * (self.HEIGHT - len(lines) - 1)) | |
| 378 self.assertEquals(len(receivedLines), len(expectedLines)) | |
| 379 for i in range(len(receivedLines)): | |
| 380 self.assertEquals( | |
| 381 receivedLines[i], expectedLines[i], | |
| 382 str(receivedLines[max(0, i-1):i+1]) + | |
| 383 " != " + | |
| 384 str(expectedLines[max(0, i-1):i+1])) | |
| 385 | |
| 386 def _trivialTest(self, input, output): | |
| 387 done = self.recvlineClient.expect("done") | |
| 388 | |
| 389 self._testwrite(input) | |
| 390 | |
| 391 def finished(ign): | |
| 392 self._assertBuffer(output) | |
| 393 | |
| 394 return done.addCallback(finished) | |
| 395 | |
| 396 | |
| 397 class _SSHMixin(_BaseMixin): | |
| 398 def setUp(self): | |
| 399 if not ssh: | |
| 400 raise unittest.SkipTest("Crypto requirements missing, can't run hist
oric recvline tests over ssh") | |
| 401 | |
| 402 u, p = 'testuser', 'testpass' | |
| 403 rlm = TerminalRealm() | |
| 404 rlm.userFactory = TestUser | |
| 405 rlm.chainedProtocolFactory = lambda: insultsServer | |
| 406 | |
| 407 ptl = portal.Portal( | |
| 408 rlm, | |
| 409 [checkers.InMemoryUsernamePasswordDatabaseDontUse(**{u: p})]) | |
| 410 sshFactory = ConchFactory(ptl) | |
| 411 sshFactory.serverProtocol = self.serverProtocol | |
| 412 sshFactory.startFactory() | |
| 413 | |
| 414 recvlineServer = self.serverProtocol() | |
| 415 insultsServer = insults.ServerProtocol(lambda: recvlineServer) | |
| 416 sshServer = sshFactory.buildProtocol(None) | |
| 417 clientTransport = LoopbackRelay(sshServer) | |
| 418 | |
| 419 recvlineClient = NotifyingExpectableBuffer() | |
| 420 insultsClient = insults.ClientProtocol(lambda: recvlineClient) | |
| 421 sshClient = TestTransport(lambda: insultsClient, (), {}, u, p, self.WIDT
H, self.HEIGHT) | |
| 422 serverTransport = LoopbackRelay(sshClient) | |
| 423 | |
| 424 sshClient.makeConnection(clientTransport) | |
| 425 sshServer.makeConnection(serverTransport) | |
| 426 | |
| 427 self.recvlineClient = recvlineClient | |
| 428 self.sshClient = sshClient | |
| 429 self.sshServer = sshServer | |
| 430 self.clientTransport = clientTransport | |
| 431 self.serverTransport = serverTransport | |
| 432 | |
| 433 return recvlineClient.onConnection | |
| 434 | |
| 435 def _testwrite(self, bytes): | |
| 436 self.sshClient.write(bytes) | |
| 437 | |
| 438 from twisted.conch.test import test_telnet | |
| 439 | |
| 440 class TestInsultsClientProtocol(insults.ClientProtocol, | |
| 441 test_telnet.TestProtocol): | |
| 442 pass | |
| 443 | |
| 444 | |
| 445 class TestInsultsServerProtocol(insults.ServerProtocol, | |
| 446 test_telnet.TestProtocol): | |
| 447 pass | |
| 448 | |
| 449 class _TelnetMixin(_BaseMixin): | |
| 450 def setUp(self): | |
| 451 recvlineServer = self.serverProtocol() | |
| 452 insultsServer = TestInsultsServerProtocol(lambda: recvlineServer) | |
| 453 telnetServer = telnet.TelnetTransport(lambda: insultsServer) | |
| 454 clientTransport = LoopbackRelay(telnetServer) | |
| 455 | |
| 456 recvlineClient = NotifyingExpectableBuffer() | |
| 457 insultsClient = TestInsultsClientProtocol(lambda: recvlineClient) | |
| 458 telnetClient = telnet.TelnetTransport(lambda: insultsClient) | |
| 459 serverTransport = LoopbackRelay(telnetClient) | |
| 460 | |
| 461 telnetClient.makeConnection(clientTransport) | |
| 462 telnetServer.makeConnection(serverTransport) | |
| 463 | |
| 464 serverTransport.clearBuffer() | |
| 465 clientTransport.clearBuffer() | |
| 466 | |
| 467 self.recvlineClient = recvlineClient | |
| 468 self.telnetClient = telnetClient | |
| 469 self.clientTransport = clientTransport | |
| 470 self.serverTransport = serverTransport | |
| 471 | |
| 472 return recvlineClient.onConnection | |
| 473 | |
| 474 def _testwrite(self, bytes): | |
| 475 self.telnetClient.write(bytes) | |
| 476 | |
| 477 try: | |
| 478 from twisted.conch import stdio | |
| 479 except ImportError: | |
| 480 stdio = None | |
| 481 | |
| 482 class _StdioMixin(_BaseMixin): | |
| 483 def setUp(self): | |
| 484 # A memory-only terminal emulator, into which the server will | |
| 485 # write things and make other state changes. What ends up | |
| 486 # here is basically what a user would have seen on their | |
| 487 # screen. | |
| 488 testTerminal = NotifyingExpectableBuffer() | |
| 489 | |
| 490 # An insults client protocol which will translate bytes | |
| 491 # received from the child process into keystroke commands for | |
| 492 # an ITerminalProtocol. | |
| 493 insultsClient = insults.ClientProtocol(lambda: testTerminal) | |
| 494 | |
| 495 # A process protocol which will translate stdout and stderr | |
| 496 # received from the child process to dataReceived calls and | |
| 497 # error reporting on an insults client protocol. | |
| 498 processClient = stdio.TerminalProcessProtocol(insultsClient) | |
| 499 | |
| 500 # Run twisted/conch/stdio.py with the name of a class | |
| 501 # implementing ITerminalProtocol. This class will be used to | |
| 502 # handle bytes we send to the child process. | |
| 503 exe = sys.executable | |
| 504 module = stdio.__file__ | |
| 505 if module.endswith('.pyc') or module.endswith('.pyo'): | |
| 506 module = module[:-1] | |
| 507 args = [exe, module, reflect.qual(self.serverProtocol)] | |
| 508 env = {"PYTHONPATH": os.pathsep.join(sys.path)} | |
| 509 | |
| 510 from twisted.internet import reactor | |
| 511 clientTransport = reactor.spawnProcess(processClient, exe, args, | |
| 512 env=env, usePTY=True) | |
| 513 | |
| 514 self.recvlineClient = self.testTerminal = testTerminal | |
| 515 self.processClient = processClient | |
| 516 self.clientTransport = clientTransport | |
| 517 | |
| 518 # Wait for the process protocol and test terminal to become | |
| 519 # connected before proceeding. The former should always | |
| 520 # happen first, but it doesn't hurt to be safe. | |
| 521 return defer.gatherResults(filter(None, [ | |
| 522 processClient.onConnection, | |
| 523 testTerminal.expect(">>> ")])) | |
| 524 | |
| 525 def tearDown(self): | |
| 526 # Kill the child process. We're done with it. | |
| 527 try: | |
| 528 self.clientTransport.signalProcess("KILL") | |
| 529 except (error.ProcessExitedAlready, OSError): | |
| 530 pass | |
| 531 def trap(failure): | |
| 532 failure.trap(error.ProcessTerminated) | |
| 533 self.assertEquals(failure.value.exitCode, None) | |
| 534 self.assertEquals(failure.value.status, 9) | |
| 535 return self.testTerminal.onDisconnection.addErrback(trap) | |
| 536 | |
| 537 def _testwrite(self, bytes): | |
| 538 self.clientTransport.write(bytes) | |
| 539 | |
| 540 class RecvlineLoopbackMixin: | |
| 541 serverProtocol = EchoServer | |
| 542 | |
| 543 def testSimple(self): | |
| 544 return self._trivialTest( | |
| 545 "first line\ndone", | |
| 546 [">>> first line", | |
| 547 "first line", | |
| 548 ">>> done"]) | |
| 549 | |
| 550 def testLeftArrow(self): | |
| 551 return self._trivialTest( | |
| 552 insert + 'first line' + left * 4 + "xxxx\ndone", | |
| 553 [">>> first xxxx", | |
| 554 "first xxxx", | |
| 555 ">>> done"]) | |
| 556 | |
| 557 def testRightArrow(self): | |
| 558 return self._trivialTest( | |
| 559 insert + 'right line' + left * 4 + right * 2 + "xx\ndone", | |
| 560 [">>> right lixx", | |
| 561 "right lixx", | |
| 562 ">>> done"]) | |
| 563 | |
| 564 def testBackspace(self): | |
| 565 return self._trivialTest( | |
| 566 "second line" + backspace * 4 + "xxxx\ndone", | |
| 567 [">>> second xxxx", | |
| 568 "second xxxx", | |
| 569 ">>> done"]) | |
| 570 | |
| 571 def testDelete(self): | |
| 572 return self._trivialTest( | |
| 573 "delete xxxx" + left * 4 + delete * 4 + "line\ndone", | |
| 574 [">>> delete line", | |
| 575 "delete line", | |
| 576 ">>> done"]) | |
| 577 | |
| 578 def testInsert(self): | |
| 579 return self._trivialTest( | |
| 580 "third ine" + left * 3 + "l\ndone", | |
| 581 [">>> third line", | |
| 582 "third line", | |
| 583 ">>> done"]) | |
| 584 | |
| 585 def testTypeover(self): | |
| 586 return self._trivialTest( | |
| 587 "fourth xine" + left * 4 + insert + "l\ndone", | |
| 588 [">>> fourth line", | |
| 589 "fourth line", | |
| 590 ">>> done"]) | |
| 591 | |
| 592 def testHome(self): | |
| 593 return self._trivialTest( | |
| 594 insert + "blah line" + home + "home\ndone", | |
| 595 [">>> home line", | |
| 596 "home line", | |
| 597 ">>> done"]) | |
| 598 | |
| 599 def testEnd(self): | |
| 600 return self._trivialTest( | |
| 601 "end " + left * 4 + end + "line\ndone", | |
| 602 [">>> end line", | |
| 603 "end line", | |
| 604 ">>> done"]) | |
| 605 | |
| 606 class RecvlineLoopbackTelnet(_TelnetMixin, unittest.TestCase, RecvlineLoopbackMi
xin): | |
| 607 pass | |
| 608 | |
| 609 class RecvlineLoopbackSSH(_SSHMixin, unittest.TestCase, RecvlineLoopbackMixin): | |
| 610 pass | |
| 611 | |
| 612 class RecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, RecvlineLoopbackMixi
n): | |
| 613 if stdio is None: | |
| 614 skip = "Terminal requirements missing, can't run recvline tests over std
io" | |
| 615 | |
| 616 | |
| 617 class HistoricRecvlineLoopbackMixin: | |
| 618 serverProtocol = EchoServer | |
| 619 | |
| 620 def testUpArrow(self): | |
| 621 return self._trivialTest( | |
| 622 "first line\n" + up + "\ndone", | |
| 623 [">>> first line", | |
| 624 "first line", | |
| 625 ">>> first line", | |
| 626 "first line", | |
| 627 ">>> done"]) | |
| 628 | |
| 629 def testDownArrow(self): | |
| 630 return self._trivialTest( | |
| 631 "first line\nsecond line\n" + up * 2 + down + "\ndone", | |
| 632 [">>> first line", | |
| 633 "first line", | |
| 634 ">>> second line", | |
| 635 "second line", | |
| 636 ">>> second line", | |
| 637 "second line", | |
| 638 ">>> done"]) | |
| 639 | |
| 640 class HistoricRecvlineLoopbackTelnet(_TelnetMixin, unittest.TestCase, HistoricRe
cvlineLoopbackMixin): | |
| 641 pass | |
| 642 | |
| 643 class HistoricRecvlineLoopbackSSH(_SSHMixin, unittest.TestCase, HistoricRecvline
LoopbackMixin): | |
| 644 pass | |
| 645 | |
| 646 class HistoricRecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, HistoricRecv
lineLoopbackMixin): | |
| 647 if stdio is None: | |
| 648 skip = "Terminal requirements missing, can't run historic recvline tests
over stdio" | |
| OLD | NEW |