OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.conch.test.test_insults -*- | |
2 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 from twisted.trial import unittest | |
6 from twisted.test.proto_helpers import StringTransport | |
7 | |
8 from twisted.conch.insults.insults import ServerProtocol, ClientProtocol | |
9 from twisted.conch.insults.insults import CS_UK, CS_US, CS_DRAWING, CS_ALTERNATE
, CS_ALTERNATE_SPECIAL | |
10 from twisted.conch.insults.insults import G0, G1 | |
11 from twisted.conch.insults.insults import modes | |
12 | |
13 def _getattr(mock, name): | |
14 return super(Mock, mock).__getattribute__(name) | |
15 | |
16 def occurrences(mock): | |
17 return _getattr(mock, 'occurrences') | |
18 | |
19 def methods(mock): | |
20 return _getattr(mock, 'methods') | |
21 | |
22 def _append(mock, obj): | |
23 occurrences(mock).append(obj) | |
24 | |
25 default = object() | |
26 | |
27 class Mock(object): | |
28 callReturnValue = default | |
29 | |
30 def __init__(self, methods=None, callReturnValue=default): | |
31 """ | |
32 @param methods: Mapping of names to return values | |
33 @param callReturnValue: object __call__ should return | |
34 """ | |
35 self.occurrences = [] | |
36 if methods is None: | |
37 methods = {} | |
38 self.methods = methods | |
39 if callReturnValue is not default: | |
40 self.callReturnValue = callReturnValue | |
41 | |
42 def __call__(self, *a, **kw): | |
43 returnValue = _getattr(self, 'callReturnValue') | |
44 if returnValue is default: | |
45 returnValue = Mock() | |
46 # _getattr(self, 'occurrences').append(('__call__', returnValue, a, kw)) | |
47 _append(self, ('__call__', returnValue, a, kw)) | |
48 return returnValue | |
49 | |
50 def __getattribute__(self, name): | |
51 methods = _getattr(self, 'methods') | |
52 if name in methods: | |
53 attrValue = Mock(callReturnValue=methods[name]) | |
54 else: | |
55 attrValue = Mock() | |
56 # _getattr(self, 'occurrences').append((name, attrValue)) | |
57 _append(self, (name, attrValue)) | |
58 return attrValue | |
59 | |
60 class MockMixin: | |
61 def assertCall(self, occurrence, methodName, args=(), kw={}): | |
62 attr, mock = occurrence | |
63 self.assertEquals(attr, methodName) | |
64 self.assertEquals(len(occurrences(mock)), 1) | |
65 [(call, result, args, kw)] = occurrences(mock) | |
66 self.assertEquals(call, "__call__") | |
67 self.assertEquals(args, args) | |
68 self.assertEquals(kw, kw) | |
69 return result | |
70 | |
71 | |
72 _byteGroupingTestTemplate = """\ | |
73 def testByte%(groupName)s(self): | |
74 transport = StringTransport() | |
75 proto = Mock() | |
76 parser = self.protocolFactory(lambda: proto) | |
77 parser.factory = self | |
78 parser.makeConnection(transport) | |
79 | |
80 bytes = self.TEST_BYTES | |
81 while bytes: | |
82 chunk = bytes[:%(bytesPer)d] | |
83 bytes = bytes[%(bytesPer)d:] | |
84 parser.dataReceived(chunk) | |
85 | |
86 self.verifyResults(transport, proto, parser) | |
87 """ | |
88 class ByteGroupingsMixin(MockMixin): | |
89 protocolFactory = None | |
90 | |
91 for word, n in [('Pairs', 2), ('Triples', 3), ('Quads', 4), ('Quints', 5), (
'Sexes', 6)]: | |
92 exec _byteGroupingTestTemplate % {'groupName': word, 'bytesPer': n} | |
93 del word, n | |
94 | |
95 def verifyResults(self, transport, proto, parser): | |
96 result = self.assertCall(occurrences(proto).pop(0), "makeConnection", (p
arser,)) | |
97 self.assertEquals(occurrences(result), []) | |
98 | |
99 del _byteGroupingTestTemplate | |
100 | |
101 class ServerArrowKeys(ByteGroupingsMixin, unittest.TestCase): | |
102 protocolFactory = ServerProtocol | |
103 | |
104 # All the arrow keys once | |
105 TEST_BYTES = '\x1b[A\x1b[B\x1b[C\x1b[D' | |
106 | |
107 def verifyResults(self, transport, proto, parser): | |
108 ByteGroupingsMixin.verifyResults(self, transport, proto, parser) | |
109 | |
110 for arrow in (parser.UP_ARROW, parser.DOWN_ARROW, | |
111 parser.RIGHT_ARROW, parser.LEFT_ARROW): | |
112 result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceiv
ed", (arrow, None)) | |
113 self.assertEquals(occurrences(result), []) | |
114 self.failIf(occurrences(proto)) | |
115 | |
116 | |
117 class PrintableCharacters(ByteGroupingsMixin, unittest.TestCase): | |
118 protocolFactory = ServerProtocol | |
119 | |
120 # Some letters and digits, first on their own, then capitalized, | |
121 # then modified with alt | |
122 | |
123 TEST_BYTES = 'abc123ABC!@#\x1ba\x1bb\x1bc\x1b1\x1b2\x1b3' | |
124 | |
125 def verifyResults(self, transport, proto, parser): | |
126 ByteGroupingsMixin.verifyResults(self, transport, proto, parser) | |
127 | |
128 for char in 'abc123ABC!@#': | |
129 result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceiv
ed", (char, None)) | |
130 self.assertEquals(occurrences(result), []) | |
131 | |
132 for char in 'abc123': | |
133 result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceiv
ed", (char, parser.ALT)) | |
134 self.assertEquals(occurrences(result), []) | |
135 | |
136 occs = occurrences(proto) | |
137 self.failIf(occs, "%r should have been []" % (occs,)) | |
138 | |
139 class ServerFunctionKeys(ByteGroupingsMixin, unittest.TestCase): | |
140 """Test for parsing and dispatching function keys (F1 - F12) | |
141 """ | |
142 protocolFactory = ServerProtocol | |
143 | |
144 byteList = [] | |
145 for bytes in ('OP', 'OQ', 'OR', 'OS', # F1 - F4 | |
146 '15~', '17~', '18~', '19~', # F5 - F8 | |
147 '20~', '21~', '23~', '24~'): # F9 - F12 | |
148 byteList.append('\x1b[' + bytes) | |
149 TEST_BYTES = ''.join(byteList) | |
150 del byteList, bytes | |
151 | |
152 def verifyResults(self, transport, proto, parser): | |
153 ByteGroupingsMixin.verifyResults(self, transport, proto, parser) | |
154 for funcNum in range(1, 13): | |
155 funcArg = getattr(parser, 'F%d' % (funcNum,)) | |
156 result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceiv
ed", (funcArg, None)) | |
157 self.assertEquals(occurrences(result), []) | |
158 self.failIf(occurrences(proto)) | |
159 | |
160 class ClientCursorMovement(ByteGroupingsMixin, unittest.TestCase): | |
161 protocolFactory = ClientProtocol | |
162 | |
163 d2 = "\x1b[2B" | |
164 r4 = "\x1b[4C" | |
165 u1 = "\x1b[A" | |
166 l2 = "\x1b[2D" | |
167 # Move the cursor down two, right four, up one, left two, up one, left two | |
168 TEST_BYTES = d2 + r4 + u1 + l2 + u1 + l2 | |
169 del d2, r4, u1, l2 | |
170 | |
171 def verifyResults(self, transport, proto, parser): | |
172 ByteGroupingsMixin.verifyResults(self, transport, proto, parser) | |
173 | |
174 for (method, count) in [('Down', 2), ('Forward', 4), ('Up', 1), | |
175 ('Backward', 2), ('Up', 1), ('Backward', 2)]: | |
176 result = self.assertCall(occurrences(proto).pop(0), "cursor" + metho
d, (count,)) | |
177 self.assertEquals(occurrences(result), []) | |
178 self.failIf(occurrences(proto)) | |
179 | |
180 class ClientControlSequences(unittest.TestCase, MockMixin): | |
181 def setUp(self): | |
182 self.transport = StringTransport() | |
183 self.proto = Mock() | |
184 self.parser = ClientProtocol(lambda: self.proto) | |
185 self.parser.factory = self | |
186 self.parser.makeConnection(self.transport) | |
187 result = self.assertCall(occurrences(self.proto).pop(0), "makeConnection
", (self.parser,)) | |
188 self.failIf(occurrences(result)) | |
189 | |
190 def testSimpleCardinals(self): | |
191 self.parser.dataReceived( | |
192 ''.join([''.join(['\x1b[' + str(n) + ch for n in ('', 2, 20, 200)])
for ch in 'BACD'])) | |
193 occs = occurrences(self.proto) | |
194 | |
195 for meth in ("Down", "Up", "Forward", "Backward"): | |
196 for count in (1, 2, 20, 200): | |
197 result = self.assertCall(occs.pop(0), "cursor" + meth, (count,)) | |
198 self.failIf(occurrences(result)) | |
199 self.failIf(occs) | |
200 | |
201 def testScrollRegion(self): | |
202 self.parser.dataReceived('\x1b[5;22r\x1b[r') | |
203 occs = occurrences(self.proto) | |
204 | |
205 result = self.assertCall(occs.pop(0), "setScrollRegion", (5, 22)) | |
206 self.failIf(occurrences(result)) | |
207 | |
208 result = self.assertCall(occs.pop(0), "setScrollRegion", (None, None)) | |
209 self.failIf(occurrences(result)) | |
210 self.failIf(occs) | |
211 | |
212 def testHeightAndWidth(self): | |
213 self.parser.dataReceived("\x1b#3\x1b#4\x1b#5\x1b#6") | |
214 occs = occurrences(self.proto) | |
215 | |
216 result = self.assertCall(occs.pop(0), "doubleHeightLine", (True,)) | |
217 self.failIf(occurrences(result)) | |
218 | |
219 result = self.assertCall(occs.pop(0), "doubleHeightLine", (False,)) | |
220 self.failIf(occurrences(result)) | |
221 | |
222 result = self.assertCall(occs.pop(0), "singleWidthLine") | |
223 self.failIf(occurrences(result)) | |
224 | |
225 result = self.assertCall(occs.pop(0), "doubleWidthLine") | |
226 self.failIf(occurrences(result)) | |
227 self.failIf(occs) | |
228 | |
229 def testCharacterSet(self): | |
230 self.parser.dataReceived( | |
231 ''.join([''.join(['\x1b' + g + n for n in 'AB012']) for g in '()'])) | |
232 occs = occurrences(self.proto) | |
233 | |
234 for which in (G0, G1): | |
235 for charset in (CS_UK, CS_US, CS_DRAWING, CS_ALTERNATE, CS_ALTERNATE
_SPECIAL): | |
236 result = self.assertCall(occs.pop(0), "selectCharacterSet", (cha
rset, which)) | |
237 self.failIf(occurrences(result)) | |
238 self.failIf(occs) | |
239 | |
240 def testShifting(self): | |
241 self.parser.dataReceived("\x15\x14") | |
242 occs = occurrences(self.proto) | |
243 | |
244 result = self.assertCall(occs.pop(0), "shiftIn") | |
245 self.failIf(occurrences(result)) | |
246 | |
247 result = self.assertCall(occs.pop(0), "shiftOut") | |
248 self.failIf(occurrences(result)) | |
249 self.failIf(occs) | |
250 | |
251 def testSingleShifts(self): | |
252 self.parser.dataReceived("\x1bN\x1bO") | |
253 occs = occurrences(self.proto) | |
254 | |
255 result = self.assertCall(occs.pop(0), "singleShift2") | |
256 self.failIf(occurrences(result)) | |
257 | |
258 result = self.assertCall(occs.pop(0), "singleShift3") | |
259 self.failIf(occurrences(result)) | |
260 self.failIf(occs) | |
261 | |
262 def testKeypadMode(self): | |
263 self.parser.dataReceived("\x1b=\x1b>") | |
264 occs = occurrences(self.proto) | |
265 | |
266 result = self.assertCall(occs.pop(0), "applicationKeypadMode") | |
267 self.failIf(occurrences(result)) | |
268 | |
269 result = self.assertCall(occs.pop(0), "numericKeypadMode") | |
270 self.failIf(occurrences(result)) | |
271 self.failIf(occs) | |
272 | |
273 def testCursor(self): | |
274 self.parser.dataReceived("\x1b7\x1b8") | |
275 occs = occurrences(self.proto) | |
276 | |
277 result = self.assertCall(occs.pop(0), "saveCursor") | |
278 self.failIf(occurrences(result)) | |
279 | |
280 result = self.assertCall(occs.pop(0), "restoreCursor") | |
281 self.failIf(occurrences(result)) | |
282 self.failIf(occs) | |
283 | |
284 def testReset(self): | |
285 self.parser.dataReceived("\x1bc") | |
286 occs = occurrences(self.proto) | |
287 | |
288 result = self.assertCall(occs.pop(0), "reset") | |
289 self.failIf(occurrences(result)) | |
290 self.failIf(occs) | |
291 | |
292 def testIndex(self): | |
293 self.parser.dataReceived("\x1bD\x1bM\x1bE") | |
294 occs = occurrences(self.proto) | |
295 | |
296 result = self.assertCall(occs.pop(0), "index") | |
297 self.failIf(occurrences(result)) | |
298 | |
299 result = self.assertCall(occs.pop(0), "reverseIndex") | |
300 self.failIf(occurrences(result)) | |
301 | |
302 result = self.assertCall(occs.pop(0), "nextLine") | |
303 self.failIf(occurrences(result)) | |
304 self.failIf(occs) | |
305 | |
306 def testModes(self): | |
307 self.parser.dataReceived( | |
308 "\x1b[" + ';'.join(map(str, [modes.KAM, modes.IRM, modes.LNM])) + "h
") | |
309 self.parser.dataReceived( | |
310 "\x1b[" + ';'.join(map(str, [modes.KAM, modes.IRM, modes.LNM])) + "l
") | |
311 occs = occurrences(self.proto) | |
312 | |
313 result = self.assertCall(occs.pop(0), "setModes", ([modes.KAM, modes.IRM
, modes.LNM],)) | |
314 self.failIf(occurrences(result)) | |
315 | |
316 result = self.assertCall(occs.pop(0), "resetModes", ([modes.KAM, modes.I
RM, modes.LNM],)) | |
317 self.failIf(occurrences(result)) | |
318 self.failIf(occs) | |
319 | |
320 def testErasure(self): | |
321 self.parser.dataReceived( | |
322 "\x1b[K\x1b[1K\x1b[2K\x1b[J\x1b[1J\x1b[2J\x1b[3P") | |
323 occs = occurrences(self.proto) | |
324 | |
325 for meth in ("eraseToLineEnd", "eraseToLineBeginning", "eraseLine", | |
326 "eraseToDisplayEnd", "eraseToDisplayBeginning", | |
327 "eraseDisplay"): | |
328 result = self.assertCall(occs.pop(0), meth) | |
329 self.failIf(occurrences(result)) | |
330 | |
331 result = self.assertCall(occs.pop(0), "deleteCharacter", (3,)) | |
332 self.failIf(occurrences(result)) | |
333 self.failIf(occs) | |
334 | |
335 def testLineDeletion(self): | |
336 self.parser.dataReceived("\x1b[M\x1b[3M") | |
337 occs = occurrences(self.proto) | |
338 | |
339 for arg in (1, 3): | |
340 result = self.assertCall(occs.pop(0), "deleteLine", (arg,)) | |
341 self.failIf(occurrences(result)) | |
342 self.failIf(occs) | |
343 | |
344 def testLineInsertion(self): | |
345 self.parser.dataReceived("\x1b[L\x1b[3L") | |
346 occs = occurrences(self.proto) | |
347 | |
348 for arg in (1, 3): | |
349 result = self.assertCall(occs.pop(0), "insertLine", (arg,)) | |
350 self.failIf(occurrences(result)) | |
351 self.failIf(occs) | |
352 | |
353 def testCursorPosition(self): | |
354 methods(self.proto)['reportCursorPosition'] = (6, 7) | |
355 self.parser.dataReceived("\x1b[6n") | |
356 self.assertEquals(self.transport.value(), "\x1b[7;8R") | |
357 occs = occurrences(self.proto) | |
358 | |
359 result = self.assertCall(occs.pop(0), "reportCursorPosition") | |
360 # This isn't really an interesting assert, since it only tests that | |
361 # our mock setup is working right, but I'll include it anyway. | |
362 self.assertEquals(result, (6, 7)) | |
363 | |
364 | |
365 | |
366 class ServerProtocolOutputTests(unittest.TestCase): | |
367 """ | |
368 Tests for the bytes L{ServerProtocol} writes to its transport when its | |
369 methods are called. | |
370 """ | |
371 def test_nextLine(self): | |
372 """ | |
373 L{ServerProtocol.nextLine} writes C{"\r\n"} to its transport. | |
374 """ | |
375 # Why doesn't it write ESC E? Because ESC E is poorly supported. For | |
376 # example, gnome-terminal (many different versions) fails to scroll if | |
377 # it receives ESC E and the cursor is already on the last row. | |
378 protocol = ServerProtocol() | |
379 transport = StringTransport() | |
380 protocol.makeConnection(transport) | |
381 protocol.nextLine() | |
382 self.assertEqual(transport.value(), "\r\n") | |
OLD | NEW |