OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.conch.test.test_manhole -*- | |
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 """ | |
6 Tests for L{twisted.conch.manhole}. | |
7 """ | |
8 | |
9 import traceback | |
10 | |
11 from twisted.trial import unittest | |
12 from twisted.internet import error, defer | |
13 from twisted.test.proto_helpers import StringTransport | |
14 from twisted.conch.test.test_recvline import _TelnetMixin, _SSHMixin, _StdioMixi
n, stdio, ssh | |
15 from twisted.conch import manhole | |
16 from twisted.conch.insults import insults | |
17 | |
18 | |
19 def determineDefaultFunctionName(): | |
20 """ | |
21 Return the string used by Python as the name for code objects which are | |
22 compiled from interactive input or at the top-level of modules. | |
23 """ | |
24 try: | |
25 1 / 0 | |
26 except: | |
27 return traceback.extract_stack()[0][2] | |
28 defaultFunctionName = determineDefaultFunctionName() | |
29 | |
30 | |
31 | |
32 class ManholeInterpreterTests(unittest.TestCase): | |
33 """ | |
34 Tests for L{manhole.ManholeInterpreter}. | |
35 """ | |
36 def test_resetBuffer(self): | |
37 """ | |
38 L{ManholeInterpreter.resetBuffer} should empty the input buffer. | |
39 """ | |
40 interpreter = manhole.ManholeInterpreter(None) | |
41 interpreter.buffer.extend(["1", "2"]) | |
42 interpreter.resetBuffer() | |
43 self.assertFalse(interpreter.buffer) | |
44 | |
45 | |
46 | |
47 class ManholeProtocolTests(unittest.TestCase): | |
48 """ | |
49 Tests for L{manhole.Manhole}. | |
50 """ | |
51 def test_interruptResetsInterpreterBuffer(self): | |
52 """ | |
53 L{manhole.Manhole.handle_INT} should cause the interpreter input buffer | |
54 to be reset. | |
55 """ | |
56 transport = StringTransport() | |
57 terminal = insults.ServerProtocol(manhole.Manhole) | |
58 terminal.makeConnection(transport) | |
59 protocol = terminal.terminalProtocol | |
60 interpreter = protocol.interpreter | |
61 interpreter.buffer.extend(["1", "2"]) | |
62 protocol.handle_INT() | |
63 self.assertFalse(interpreter.buffer) | |
64 | |
65 | |
66 | |
67 class WriterTestCase(unittest.TestCase): | |
68 def testInteger(self): | |
69 manhole.lastColorizedLine("1") | |
70 | |
71 | |
72 def testDoubleQuoteString(self): | |
73 manhole.lastColorizedLine('"1"') | |
74 | |
75 | |
76 def testSingleQuoteString(self): | |
77 manhole.lastColorizedLine("'1'") | |
78 | |
79 | |
80 def testTripleSingleQuotedString(self): | |
81 manhole.lastColorizedLine("'''1'''") | |
82 | |
83 | |
84 def testTripleDoubleQuotedString(self): | |
85 manhole.lastColorizedLine('"""1"""') | |
86 | |
87 | |
88 def testFunctionDefinition(self): | |
89 manhole.lastColorizedLine("def foo():") | |
90 | |
91 | |
92 def testClassDefinition(self): | |
93 manhole.lastColorizedLine("class foo:") | |
94 | |
95 | |
96 class ManholeLoopbackMixin: | |
97 serverProtocol = manhole.ColoredManhole | |
98 | |
99 def wfd(self, d): | |
100 return defer.waitForDeferred(d) | |
101 | |
102 def testSimpleExpression(self): | |
103 done = self.recvlineClient.expect("done") | |
104 | |
105 self._testwrite( | |
106 "1 + 1\n" | |
107 "done") | |
108 | |
109 def finished(ign): | |
110 self._assertBuffer( | |
111 [">>> 1 + 1", | |
112 "2", | |
113 ">>> done"]) | |
114 | |
115 return done.addCallback(finished) | |
116 | |
117 def testTripleQuoteLineContinuation(self): | |
118 done = self.recvlineClient.expect("done") | |
119 | |
120 self._testwrite( | |
121 "'''\n'''\n" | |
122 "done") | |
123 | |
124 def finished(ign): | |
125 self._assertBuffer( | |
126 [">>> '''", | |
127 "... '''", | |
128 "'\\n'", | |
129 ">>> done"]) | |
130 | |
131 return done.addCallback(finished) | |
132 | |
133 def testFunctionDefinition(self): | |
134 done = self.recvlineClient.expect("done") | |
135 | |
136 self._testwrite( | |
137 "def foo(bar):\n" | |
138 "\tprint bar\n\n" | |
139 "foo(42)\n" | |
140 "done") | |
141 | |
142 def finished(ign): | |
143 self._assertBuffer( | |
144 [">>> def foo(bar):", | |
145 "... print bar", | |
146 "... ", | |
147 ">>> foo(42)", | |
148 "42", | |
149 ">>> done"]) | |
150 | |
151 return done.addCallback(finished) | |
152 | |
153 def testClassDefinition(self): | |
154 done = self.recvlineClient.expect("done") | |
155 | |
156 self._testwrite( | |
157 "class Foo:\n" | |
158 "\tdef bar(self):\n" | |
159 "\t\tprint 'Hello, world!'\n\n" | |
160 "Foo().bar()\n" | |
161 "done") | |
162 | |
163 def finished(ign): | |
164 self._assertBuffer( | |
165 [">>> class Foo:", | |
166 "... def bar(self):", | |
167 "... print 'Hello, world!'", | |
168 "... ", | |
169 ">>> Foo().bar()", | |
170 "Hello, world!", | |
171 ">>> done"]) | |
172 | |
173 return done.addCallback(finished) | |
174 | |
175 def testException(self): | |
176 done = self.recvlineClient.expect("done") | |
177 | |
178 self._testwrite( | |
179 "1 / 0\n" | |
180 "done") | |
181 | |
182 def finished(ign): | |
183 self._assertBuffer( | |
184 [">>> 1 / 0", | |
185 "Traceback (most recent call last):", | |
186 ' File "<console>", line 1, in ' + defaultFunctionName, | |
187 "ZeroDivisionError: integer division or modulo by zero", | |
188 ">>> done"]) | |
189 | |
190 return done.addCallback(finished) | |
191 | |
192 def testControlC(self): | |
193 done = self.recvlineClient.expect("done") | |
194 | |
195 self._testwrite( | |
196 "cancelled line" + manhole.CTRL_C + | |
197 "done") | |
198 | |
199 def finished(ign): | |
200 self._assertBuffer( | |
201 [">>> cancelled line", | |
202 "KeyboardInterrupt", | |
203 ">>> done"]) | |
204 | |
205 return done.addCallback(finished) | |
206 | |
207 | |
208 def test_interruptDuringContinuation(self): | |
209 """ | |
210 Sending ^C to Manhole while in a state where more input is required to | |
211 complete a statement should discard the entire ongoing statement and | |
212 reset the input prompt to the non-continuation prompt. | |
213 """ | |
214 continuing = self.recvlineClient.expect("things") | |
215 | |
216 self._testwrite("(\nthings") | |
217 | |
218 def gotContinuation(ignored): | |
219 self._assertBuffer( | |
220 [">>> (", | |
221 "... things"]) | |
222 interrupted = self.recvlineClient.expect(">>> ") | |
223 self._testwrite(manhole.CTRL_C) | |
224 return interrupted | |
225 continuing.addCallback(gotContinuation) | |
226 | |
227 def gotInterruption(ignored): | |
228 self._assertBuffer( | |
229 [">>> (", | |
230 "... things", | |
231 "KeyboardInterrupt", | |
232 ">>> "]) | |
233 continuing.addCallback(gotInterruption) | |
234 return continuing | |
235 | |
236 | |
237 def testControlBackslash(self): | |
238 self._testwrite("cancelled line") | |
239 partialLine = self.recvlineClient.expect("cancelled line") | |
240 | |
241 def gotPartialLine(ign): | |
242 self._assertBuffer( | |
243 [">>> cancelled line"]) | |
244 self._testwrite(manhole.CTRL_BACKSLASH) | |
245 | |
246 d = self.recvlineClient.onDisconnection | |
247 return self.assertFailure(d, error.ConnectionDone) | |
248 | |
249 def gotClearedLine(ign): | |
250 self._assertBuffer( | |
251 [""]) | |
252 | |
253 return partialLine.addCallback(gotPartialLine).addCallback(gotClearedLin
e) | |
254 | |
255 def testControlD(self): | |
256 self._testwrite("1 + 1") | |
257 helloWorld = self.wfd(self.recvlineClient.expect(r"\+ 1")) | |
258 yield helloWorld | |
259 helloWorld.getResult() | |
260 self._assertBuffer([">>> 1 + 1"]) | |
261 | |
262 self._testwrite(manhole.CTRL_D + " + 1") | |
263 cleared = self.wfd(self.recvlineClient.expect(r"\+ 1")) | |
264 yield cleared | |
265 cleared.getResult() | |
266 self._assertBuffer([">>> 1 + 1 + 1"]) | |
267 | |
268 self._testwrite("\n") | |
269 printed = self.wfd(self.recvlineClient.expect("3\n>>> ")) | |
270 yield printed | |
271 printed.getResult() | |
272 | |
273 self._testwrite(manhole.CTRL_D) | |
274 d = self.recvlineClient.onDisconnection | |
275 disconnected = self.wfd(self.assertFailure(d, error.ConnectionDone)) | |
276 yield disconnected | |
277 disconnected.getResult() | |
278 testControlD = defer.deferredGenerator(testControlD) | |
279 | |
280 | |
281 def testControlL(self): | |
282 """ | |
283 CTRL+L is generally used as a redraw-screen command in terminal | |
284 applications. Manhole doesn't currently respect this usage of it, | |
285 but it should at least do something reasonable in response to this | |
286 event (rather than, say, eating your face). | |
287 """ | |
288 # Start off with a newline so that when we clear the display we can | |
289 # tell by looking for the missing first empty prompt line. | |
290 self._testwrite("\n1 + 1") | |
291 helloWorld = self.wfd(self.recvlineClient.expect(r"\+ 1")) | |
292 yield helloWorld | |
293 helloWorld.getResult() | |
294 self._assertBuffer([">>> ", ">>> 1 + 1"]) | |
295 | |
296 self._testwrite(manhole.CTRL_L + " + 1") | |
297 redrew = self.wfd(self.recvlineClient.expect(r"1 \+ 1 \+ 1")) | |
298 yield redrew | |
299 redrew.getResult() | |
300 self._assertBuffer([">>> 1 + 1 + 1"]) | |
301 testControlL = defer.deferredGenerator(testControlL) | |
302 | |
303 | |
304 def testDeferred(self): | |
305 self._testwrite( | |
306 "from twisted.internet import defer, reactor\n" | |
307 "d = defer.Deferred()\n" | |
308 "d\n") | |
309 | |
310 deferred = self.wfd(self.recvlineClient.expect("<Deferred #0>")) | |
311 yield deferred | |
312 deferred.getResult() | |
313 | |
314 self._testwrite( | |
315 "c = reactor.callLater(0.1, d.callback, 'Hi!')\n") | |
316 delayed = self.wfd(self.recvlineClient.expect(">>> ")) | |
317 yield delayed | |
318 delayed.getResult() | |
319 | |
320 called = self.wfd(self.recvlineClient.expect("Deferred #0 called back: '
Hi!'\n>>> ")) | |
321 yield called | |
322 called.getResult() | |
323 self._assertBuffer( | |
324 [">>> from twisted.internet import defer, reactor", | |
325 ">>> d = defer.Deferred()", | |
326 ">>> d", | |
327 "<Deferred #0>", | |
328 ">>> c = reactor.callLater(0.1, d.callback, 'Hi!')", | |
329 "Deferred #0 called back: 'Hi!'", | |
330 ">>> "]) | |
331 | |
332 testDeferred = defer.deferredGenerator(testDeferred) | |
333 | |
334 class ManholeLoopbackTelnet(_TelnetMixin, unittest.TestCase, ManholeLoopbackMixi
n): | |
335 pass | |
336 | |
337 class ManholeLoopbackSSH(_SSHMixin, unittest.TestCase, ManholeLoopbackMixin): | |
338 if ssh is None: | |
339 skip = "Crypto requirements missing, can't run manhole tests over ssh" | |
340 | |
341 class ManholeLoopbackStdio(_StdioMixin, unittest.TestCase, ManholeLoopbackMixin)
: | |
342 if stdio is None: | |
343 skip = "Terminal requirements missing, can't run manhole tests over stdi
o" | |
344 else: | |
345 serverProtocol = stdio.ConsoleManhole | |
OLD | NEW |