Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(338)

Side by Side Diff: third_party/twisted_8_1/twisted/conch/test/test_recvline.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.conch.test.test_recvline -*-
2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Tests for L{twisted.conch.recvline} and fixtures for testing related
7 functionality.
8 """
9
10 import sys, os
11
12 from twisted.conch.insults import insults
13 from twisted.conch import recvline
14
15 from twisted.python import reflect, components
16 from twisted.internet import defer, error
17 from twisted.trial import unittest
18 from twisted.cred import portal
19 from twisted.test.proto_helpers import StringTransport
20
21 class Arrows(unittest.TestCase):
22 def setUp(self):
23 self.underlyingTransport = StringTransport()
24 self.pt = insults.ServerProtocol()
25 self.p = recvline.HistoricRecvLine()
26 self.pt.protocolFactory = lambda: self.p
27 self.pt.factory = self
28 self.pt.makeConnection(self.underlyingTransport)
29 # self.p.makeConnection(self.pt)
30
31 def testPrintableCharacters(self):
32 self.p.keystrokeReceived('x', None)
33 self.p.keystrokeReceived('y', None)
34 self.p.keystrokeReceived('z', None)
35
36 self.assertEquals(self.p.currentLineBuffer(), ('xyz', ''))
37
38 def testHorizontalArrows(self):
39 kR = lambda ch: self.p.keystrokeReceived(ch, None)
40 for ch in 'xyz':
41 kR(ch)
42
43 self.assertEquals(self.p.currentLineBuffer(), ('xyz', ''))
44
45 kR(self.pt.RIGHT_ARROW)
46 self.assertEquals(self.p.currentLineBuffer(), ('xyz', ''))
47
48 kR(self.pt.LEFT_ARROW)
49 self.assertEquals(self.p.currentLineBuffer(), ('xy', 'z'))
50
51 kR(self.pt.LEFT_ARROW)
52 self.assertEquals(self.p.currentLineBuffer(), ('x', 'yz'))
53
54 kR(self.pt.LEFT_ARROW)
55 self.assertEquals(self.p.currentLineBuffer(), ('', 'xyz'))
56
57 kR(self.pt.LEFT_ARROW)
58 self.assertEquals(self.p.currentLineBuffer(), ('', 'xyz'))
59
60 kR(self.pt.RIGHT_ARROW)
61 self.assertEquals(self.p.currentLineBuffer(), ('x', 'yz'))
62
63 kR(self.pt.RIGHT_ARROW)
64 self.assertEquals(self.p.currentLineBuffer(), ('xy', 'z'))
65
66 kR(self.pt.RIGHT_ARROW)
67 self.assertEquals(self.p.currentLineBuffer(), ('xyz', ''))
68
69 kR(self.pt.RIGHT_ARROW)
70 self.assertEquals(self.p.currentLineBuffer(), ('xyz', ''))
71
72 def testNewline(self):
73 kR = lambda ch: self.p.keystrokeReceived(ch, None)
74
75 for ch in 'xyz\nabc\n123\n':
76 kR(ch)
77
78 self.assertEquals(self.p.currentHistoryBuffer(),
79 (('xyz', 'abc', '123'), ()))
80
81 kR('c')
82 kR('b')
83 kR('a')
84 self.assertEquals(self.p.currentHistoryBuffer(),
85 (('xyz', 'abc', '123'), ()))
86
87 kR('\n')
88 self.assertEquals(self.p.currentHistoryBuffer(),
89 (('xyz', 'abc', '123', 'cba'), ()))
90
91 def testVerticalArrows(self):
92 kR = lambda ch: self.p.keystrokeReceived(ch, None)
93
94 for ch in 'xyz\nabc\n123\n':
95 kR(ch)
96
97 self.assertEquals(self.p.currentHistoryBuffer(),
98 (('xyz', 'abc', '123'), ()))
99 self.assertEquals(self.p.currentLineBuffer(), ('', ''))
100
101 kR(self.pt.UP_ARROW)
102 self.assertEquals(self.p.currentHistoryBuffer(),
103 (('xyz', 'abc'), ('123',)))
104 self.assertEquals(self.p.currentLineBuffer(), ('123', ''))
105
106 kR(self.pt.UP_ARROW)
107 self.assertEquals(self.p.currentHistoryBuffer(),
108 (('xyz',), ('abc', '123')))
109 self.assertEquals(self.p.currentLineBuffer(), ('abc', ''))
110
111 kR(self.pt.UP_ARROW)
112 self.assertEquals(self.p.currentHistoryBuffer(),
113 ((), ('xyz', 'abc', '123')))
114 self.assertEquals(self.p.currentLineBuffer(), ('xyz', ''))
115
116 kR(self.pt.UP_ARROW)
117 self.assertEquals(self.p.currentHistoryBuffer(),
118 ((), ('xyz', 'abc', '123')))
119 self.assertEquals(self.p.currentLineBuffer(), ('xyz', ''))
120
121 for i in range(4):
122 kR(self.pt.DOWN_ARROW)
123 self.assertEquals(self.p.currentHistoryBuffer(),
124 (('xyz', 'abc', '123'), ()))
125
126 def testHome(self):
127 kR = lambda ch: self.p.keystrokeReceived(ch, None)
128
129 for ch in 'hello, world':
130 kR(ch)
131 self.assertEquals(self.p.currentLineBuffer(), ('hello, world', ''))
132
133 kR(self.pt.HOME)
134 self.assertEquals(self.p.currentLineBuffer(), ('', 'hello, world'))
135
136 def testEnd(self):
137 kR = lambda ch: self.p.keystrokeReceived(ch, None)
138
139 for ch in 'hello, world':
140 kR(ch)
141 self.assertEquals(self.p.currentLineBuffer(), ('hello, world', ''))
142
143 kR(self.pt.HOME)
144 kR(self.pt.END)
145 self.assertEquals(self.p.currentLineBuffer(), ('hello, world', ''))
146
147 def testBackspace(self):
148 kR = lambda ch: self.p.keystrokeReceived(ch, None)
149
150 for ch in 'xyz':
151 kR(ch)
152 self.assertEquals(self.p.currentLineBuffer(), ('xyz', ''))
153
154 kR(self.pt.BACKSPACE)
155 self.assertEquals(self.p.currentLineBuffer(), ('xy', ''))
156
157 kR(self.pt.LEFT_ARROW)
158 kR(self.pt.BACKSPACE)
159 self.assertEquals(self.p.currentLineBuffer(), ('', 'y'))
160
161 kR(self.pt.BACKSPACE)
162 self.assertEquals(self.p.currentLineBuffer(), ('', 'y'))
163
164 def testDelete(self):
165 kR = lambda ch: self.p.keystrokeReceived(ch, None)
166
167 for ch in 'xyz':
168 kR(ch)
169 self.assertEquals(self.p.currentLineBuffer(), ('xyz', ''))
170
171 kR(self.pt.DELETE)
172 self.assertEquals(self.p.currentLineBuffer(), ('xyz', ''))
173
174 kR(self.pt.LEFT_ARROW)
175 kR(self.pt.DELETE)
176 self.assertEquals(self.p.currentLineBuffer(), ('xy', ''))
177
178 kR(self.pt.LEFT_ARROW)
179 kR(self.pt.DELETE)
180 self.assertEquals(self.p.currentLineBuffer(), ('x', ''))
181
182 kR(self.pt.LEFT_ARROW)
183 kR(self.pt.DELETE)
184 self.assertEquals(self.p.currentLineBuffer(), ('', ''))
185
186 kR(self.pt.DELETE)
187 self.assertEquals(self.p.currentLineBuffer(), ('', ''))
188
189 def testInsert(self):
190 kR = lambda ch: self.p.keystrokeReceived(ch, None)
191
192 for ch in 'xyz':
193 kR(ch)
194
195 # kR(self.pt.INSERT)
196
197 kR(self.pt.LEFT_ARROW)
198 kR('A')
199 self.assertEquals(self.p.currentLineBuffer(), ('xyA', 'z'))
200
201 kR(self.pt.LEFT_ARROW)
202 kR('B')
203 self.assertEquals(self.p.currentLineBuffer(), ('xyB', 'Az'))
204
205 def testTypeover(self):
206 kR = lambda ch: self.p.keystrokeReceived(ch, None)
207
208 for ch in 'xyz':
209 kR(ch)
210
211 kR(self.pt.INSERT)
212
213 kR(self.pt.LEFT_ARROW)
214 kR('A')
215 self.assertEquals(self.p.currentLineBuffer(), ('xyA', ''))
216
217 kR(self.pt.LEFT_ARROW)
218 kR('B')
219 self.assertEquals(self.p.currentLineBuffer(), ('xyB', ''))
220
221
222 from twisted.conch import telnet
223 from twisted.conch.insults import helper
224 from twisted.protocols import loopback
225
226 class EchoServer(recvline.HistoricRecvLine):
227 def lineReceived(self, line):
228 self.terminal.write(line + '\n' + self.ps[self.pn])
229
230 # An insults API for this would be nice.
231 left = "\x1b[D"
232 right = "\x1b[C"
233 up = "\x1b[A"
234 down = "\x1b[B"
235 insert = "\x1b[2~"
236 home = "\x1b[1~"
237 delete = "\x1b[3~"
238 end = "\x1b[4~"
239 backspace = "\x7f"
240
241 from twisted.cred import checkers
242
243 try:
244 from twisted.conch.ssh import userauth, transport, channel, connection, sess ion
245 from twisted.conch.manhole_ssh import TerminalUser, TerminalSession, Termina lRealm, TerminalSessionTransport, ConchFactory
246 except ImportError:
247 ssh = False
248 else:
249 ssh = True
250 class SessionChannel(channel.SSHChannel):
251 name = 'session'
252
253 def __init__(self, protocolFactory, protocolArgs, protocolKwArgs, width, height, *a, **kw):
254 channel.SSHChannel.__init__(self, *a, **kw)
255
256 self.protocolFactory = protocolFactory
257 self.protocolArgs = protocolArgs
258 self.protocolKwArgs = protocolKwArgs
259
260 self.width = width
261 self.height = height
262
263 def channelOpen(self, data):
264 term = session.packRequest_pty_req("vt102", (self.height, self.width , 0, 0), '')
265 self.conn.sendRequest(self, 'pty-req', term)
266 self.conn.sendRequest(self, 'shell', '')
267
268 self._protocolInstance = self.protocolFactory(*self.protocolArgs, ** self.protocolKwArgs)
269 self._protocolInstance.factory = self
270 self._protocolInstance.makeConnection(self)
271
272 def closed(self):
273 self._protocolInstance.connectionLost(error.ConnectionDone())
274
275 def dataReceived(self, data):
276 self._protocolInstance.dataReceived(data)
277
278 class TestConnection(connection.SSHConnection):
279 def __init__(self, protocolFactory, protocolArgs, protocolKwArgs, width, height, *a, **kw):
280 connection.SSHConnection.__init__(self, *a, **kw)
281
282 self.protocolFactory = protocolFactory
283 self.protocolArgs = protocolArgs
284 self.protocolKwArgs = protocolKwArgs
285
286 self.width = width
287 self.height = height
288
289 def serviceStarted(self):
290 self.__channel = SessionChannel(self.protocolFactory, self.protocolA rgs, self.protocolKwArgs, self.width, self.height)
291 self.openChannel(self.__channel)
292
293 def write(self, bytes):
294 return self.__channel.write(bytes)
295
296 class TestAuth(userauth.SSHUserAuthClient):
297 def __init__(self, username, password, *a, **kw):
298 userauth.SSHUserAuthClient.__init__(self, username, *a, **kw)
299 self.password = password
300
301 def getPassword(self):
302 return defer.succeed(self.password)
303
304 class TestTransport(transport.SSHClientTransport):
305 def __init__(self, protocolFactory, protocolArgs, protocolKwArgs, userna me, password, width, height, *a, **kw):
306 # transport.SSHClientTransport.__init__(self, *a, **kw)
307 self.protocolFactory = protocolFactory
308 self.protocolArgs = protocolArgs
309 self.protocolKwArgs = protocolKwArgs
310 self.username = username
311 self.password = password
312 self.width = width
313 self.height = height
314
315 def verifyHostKey(self, hostKey, fingerprint):
316 return defer.succeed(True)
317
318 def connectionSecure(self):
319 self.__connection = TestConnection(self.protocolFactory, self.protoc olArgs, self.protocolKwArgs, self.width, self.height)
320 self.requestService(
321 TestAuth(self.username, self.password, self.__connection))
322
323 def write(self, bytes):
324 return self.__connection.write(bytes)
325
326 class TestSessionTransport(TerminalSessionTransport):
327 def protocolFactory(self):
328 return self.avatar.conn.transport.factory.serverProtocol()
329
330 class TestSession(TerminalSession):
331 transportFactory = TestSessionTransport
332
333 class TestUser(TerminalUser):
334 pass
335
336 components.registerAdapter(TestSession, TestUser, session.ISession)
337
338
339 class LoopbackRelay(loopback.LoopbackRelay):
340 clearCall = None
341
342 def logPrefix(self):
343 return "LoopbackRelay(%r)" % (self.target.__class__.__name__,)
344
345 def write(self, bytes):
346 loopback.LoopbackRelay.write(self, bytes)
347 if self.clearCall is not None:
348 self.clearCall.cancel()
349
350 from twisted.internet import reactor
351 self.clearCall = reactor.callLater(0, self._clearBuffer)
352
353 def _clearBuffer(self):
354 self.clearCall = None
355 loopback.LoopbackRelay.clearBuffer(self)
356
357
358 class NotifyingExpectableBuffer(helper.ExpectableBuffer):
359 def __init__(self):
360 self.onConnection = defer.Deferred()
361 self.onDisconnection = defer.Deferred()
362
363 def connectionMade(self):
364 helper.ExpectableBuffer.connectionMade(self)
365 self.onConnection.callback(self)
366
367 def connectionLost(self, reason):
368 self.onDisconnection.errback(reason)
369
370
371 class _BaseMixin:
372 WIDTH = 80
373 HEIGHT = 24
374
375 def _assertBuffer(self, lines):
376 receivedLines = str(self.recvlineClient).splitlines()
377 expectedLines = lines + ([''] * (self.HEIGHT - len(lines) - 1))
378 self.assertEquals(len(receivedLines), len(expectedLines))
379 for i in range(len(receivedLines)):
380 self.assertEquals(
381 receivedLines[i], expectedLines[i],
382 str(receivedLines[max(0, i-1):i+1]) +
383 " != " +
384 str(expectedLines[max(0, i-1):i+1]))
385
386 def _trivialTest(self, input, output):
387 done = self.recvlineClient.expect("done")
388
389 self._testwrite(input)
390
391 def finished(ign):
392 self._assertBuffer(output)
393
394 return done.addCallback(finished)
395
396
397 class _SSHMixin(_BaseMixin):
398 def setUp(self):
399 if not ssh:
400 raise unittest.SkipTest("Crypto requirements missing, can't run hist oric recvline tests over ssh")
401
402 u, p = 'testuser', 'testpass'
403 rlm = TerminalRealm()
404 rlm.userFactory = TestUser
405 rlm.chainedProtocolFactory = lambda: insultsServer
406
407 ptl = portal.Portal(
408 rlm,
409 [checkers.InMemoryUsernamePasswordDatabaseDontUse(**{u: p})])
410 sshFactory = ConchFactory(ptl)
411 sshFactory.serverProtocol = self.serverProtocol
412 sshFactory.startFactory()
413
414 recvlineServer = self.serverProtocol()
415 insultsServer = insults.ServerProtocol(lambda: recvlineServer)
416 sshServer = sshFactory.buildProtocol(None)
417 clientTransport = LoopbackRelay(sshServer)
418
419 recvlineClient = NotifyingExpectableBuffer()
420 insultsClient = insults.ClientProtocol(lambda: recvlineClient)
421 sshClient = TestTransport(lambda: insultsClient, (), {}, u, p, self.WIDT H, self.HEIGHT)
422 serverTransport = LoopbackRelay(sshClient)
423
424 sshClient.makeConnection(clientTransport)
425 sshServer.makeConnection(serverTransport)
426
427 self.recvlineClient = recvlineClient
428 self.sshClient = sshClient
429 self.sshServer = sshServer
430 self.clientTransport = clientTransport
431 self.serverTransport = serverTransport
432
433 return recvlineClient.onConnection
434
435 def _testwrite(self, bytes):
436 self.sshClient.write(bytes)
437
438 from twisted.conch.test import test_telnet
439
440 class TestInsultsClientProtocol(insults.ClientProtocol,
441 test_telnet.TestProtocol):
442 pass
443
444
445 class TestInsultsServerProtocol(insults.ServerProtocol,
446 test_telnet.TestProtocol):
447 pass
448
449 class _TelnetMixin(_BaseMixin):
450 def setUp(self):
451 recvlineServer = self.serverProtocol()
452 insultsServer = TestInsultsServerProtocol(lambda: recvlineServer)
453 telnetServer = telnet.TelnetTransport(lambda: insultsServer)
454 clientTransport = LoopbackRelay(telnetServer)
455
456 recvlineClient = NotifyingExpectableBuffer()
457 insultsClient = TestInsultsClientProtocol(lambda: recvlineClient)
458 telnetClient = telnet.TelnetTransport(lambda: insultsClient)
459 serverTransport = LoopbackRelay(telnetClient)
460
461 telnetClient.makeConnection(clientTransport)
462 telnetServer.makeConnection(serverTransport)
463
464 serverTransport.clearBuffer()
465 clientTransport.clearBuffer()
466
467 self.recvlineClient = recvlineClient
468 self.telnetClient = telnetClient
469 self.clientTransport = clientTransport
470 self.serverTransport = serverTransport
471
472 return recvlineClient.onConnection
473
474 def _testwrite(self, bytes):
475 self.telnetClient.write(bytes)
476
477 try:
478 from twisted.conch import stdio
479 except ImportError:
480 stdio = None
481
482 class _StdioMixin(_BaseMixin):
483 def setUp(self):
484 # A memory-only terminal emulator, into which the server will
485 # write things and make other state changes. What ends up
486 # here is basically what a user would have seen on their
487 # screen.
488 testTerminal = NotifyingExpectableBuffer()
489
490 # An insults client protocol which will translate bytes
491 # received from the child process into keystroke commands for
492 # an ITerminalProtocol.
493 insultsClient = insults.ClientProtocol(lambda: testTerminal)
494
495 # A process protocol which will translate stdout and stderr
496 # received from the child process to dataReceived calls and
497 # error reporting on an insults client protocol.
498 processClient = stdio.TerminalProcessProtocol(insultsClient)
499
500 # Run twisted/conch/stdio.py with the name of a class
501 # implementing ITerminalProtocol. This class will be used to
502 # handle bytes we send to the child process.
503 exe = sys.executable
504 module = stdio.__file__
505 if module.endswith('.pyc') or module.endswith('.pyo'):
506 module = module[:-1]
507 args = [exe, module, reflect.qual(self.serverProtocol)]
508 env = {"PYTHONPATH": os.pathsep.join(sys.path)}
509
510 from twisted.internet import reactor
511 clientTransport = reactor.spawnProcess(processClient, exe, args,
512 env=env, usePTY=True)
513
514 self.recvlineClient = self.testTerminal = testTerminal
515 self.processClient = processClient
516 self.clientTransport = clientTransport
517
518 # Wait for the process protocol and test terminal to become
519 # connected before proceeding. The former should always
520 # happen first, but it doesn't hurt to be safe.
521 return defer.gatherResults(filter(None, [
522 processClient.onConnection,
523 testTerminal.expect(">>> ")]))
524
525 def tearDown(self):
526 # Kill the child process. We're done with it.
527 try:
528 self.clientTransport.signalProcess("KILL")
529 except (error.ProcessExitedAlready, OSError):
530 pass
531 def trap(failure):
532 failure.trap(error.ProcessTerminated)
533 self.assertEquals(failure.value.exitCode, None)
534 self.assertEquals(failure.value.status, 9)
535 return self.testTerminal.onDisconnection.addErrback(trap)
536
537 def _testwrite(self, bytes):
538 self.clientTransport.write(bytes)
539
540 class RecvlineLoopbackMixin:
541 serverProtocol = EchoServer
542
543 def testSimple(self):
544 return self._trivialTest(
545 "first line\ndone",
546 [">>> first line",
547 "first line",
548 ">>> done"])
549
550 def testLeftArrow(self):
551 return self._trivialTest(
552 insert + 'first line' + left * 4 + "xxxx\ndone",
553 [">>> first xxxx",
554 "first xxxx",
555 ">>> done"])
556
557 def testRightArrow(self):
558 return self._trivialTest(
559 insert + 'right line' + left * 4 + right * 2 + "xx\ndone",
560 [">>> right lixx",
561 "right lixx",
562 ">>> done"])
563
564 def testBackspace(self):
565 return self._trivialTest(
566 "second line" + backspace * 4 + "xxxx\ndone",
567 [">>> second xxxx",
568 "second xxxx",
569 ">>> done"])
570
571 def testDelete(self):
572 return self._trivialTest(
573 "delete xxxx" + left * 4 + delete * 4 + "line\ndone",
574 [">>> delete line",
575 "delete line",
576 ">>> done"])
577
578 def testInsert(self):
579 return self._trivialTest(
580 "third ine" + left * 3 + "l\ndone",
581 [">>> third line",
582 "third line",
583 ">>> done"])
584
585 def testTypeover(self):
586 return self._trivialTest(
587 "fourth xine" + left * 4 + insert + "l\ndone",
588 [">>> fourth line",
589 "fourth line",
590 ">>> done"])
591
592 def testHome(self):
593 return self._trivialTest(
594 insert + "blah line" + home + "home\ndone",
595 [">>> home line",
596 "home line",
597 ">>> done"])
598
599 def testEnd(self):
600 return self._trivialTest(
601 "end " + left * 4 + end + "line\ndone",
602 [">>> end line",
603 "end line",
604 ">>> done"])
605
606 class RecvlineLoopbackTelnet(_TelnetMixin, unittest.TestCase, RecvlineLoopbackMi xin):
607 pass
608
609 class RecvlineLoopbackSSH(_SSHMixin, unittest.TestCase, RecvlineLoopbackMixin):
610 pass
611
612 class RecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, RecvlineLoopbackMixi n):
613 if stdio is None:
614 skip = "Terminal requirements missing, can't run recvline tests over std io"
615
616
617 class HistoricRecvlineLoopbackMixin:
618 serverProtocol = EchoServer
619
620 def testUpArrow(self):
621 return self._trivialTest(
622 "first line\n" + up + "\ndone",
623 [">>> first line",
624 "first line",
625 ">>> first line",
626 "first line",
627 ">>> done"])
628
629 def testDownArrow(self):
630 return self._trivialTest(
631 "first line\nsecond line\n" + up * 2 + down + "\ndone",
632 [">>> first line",
633 "first line",
634 ">>> second line",
635 "second line",
636 ">>> second line",
637 "second line",
638 ">>> done"])
639
640 class HistoricRecvlineLoopbackTelnet(_TelnetMixin, unittest.TestCase, HistoricRe cvlineLoopbackMixin):
641 pass
642
643 class HistoricRecvlineLoopbackSSH(_SSHMixin, unittest.TestCase, HistoricRecvline LoopbackMixin):
644 pass
645
646 class HistoricRecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, HistoricRecv lineLoopbackMixin):
647 if stdio is None:
648 skip = "Terminal requirements missing, can't run historic recvline tests over stdio"
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/conch/test/test_mixin.py ('k') | third_party/twisted_8_1/twisted/conch/test/test_ssh.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698