OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.conch.test.test_helper -*- | |
2 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 """ | |
6 Partial in-memory terminal emulator | |
7 | |
8 @author: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>} | |
9 """ | |
10 | |
11 import re, string | |
12 | |
13 from zope.interface import implements | |
14 | |
15 from twisted.internet import defer, protocol, reactor | |
16 from twisted.python import log | |
17 | |
18 from twisted.conch.insults import insults | |
19 | |
20 FOREGROUND = 30 | |
21 BACKGROUND = 40 | |
22 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9) | |
23 | |
24 class CharacterAttribute: | |
25 """Represents the attributes of a single character. | |
26 | |
27 Character set, intensity, underlinedness, blinkitude, video | |
28 reversal, as well as foreground and background colors made up a | |
29 character's attributes. | |
30 """ | |
31 def __init__(self, charset=insults.G0, | |
32 bold=False, underline=False, | |
33 blink=False, reverseVideo=False, | |
34 foreground=WHITE, background=BLACK, | |
35 | |
36 _subtracting=False): | |
37 self.charset = charset | |
38 self.bold = bold | |
39 self.underline = underline | |
40 self.blink = blink | |
41 self.reverseVideo = reverseVideo | |
42 self.foreground = foreground | |
43 self.background = background | |
44 | |
45 self._subtracting = _subtracting | |
46 | |
47 def __eq__(self, other): | |
48 return vars(self) == vars(other) | |
49 | |
50 def __ne__(self, other): | |
51 return not self.__eq__(other) | |
52 | |
53 def copy(self): | |
54 c = self.__class__() | |
55 c.__dict__.update(vars(self)) | |
56 return c | |
57 | |
58 def wantOne(self, **kw): | |
59 k, v = kw.popitem() | |
60 if getattr(self, k) != v: | |
61 attr = self.copy() | |
62 attr._subtracting = not v | |
63 setattr(attr, k, v) | |
64 return attr | |
65 else: | |
66 return self.copy() | |
67 | |
68 def toVT102(self): | |
69 # Spit out a vt102 control sequence that will set up | |
70 # all the attributes set here. Except charset. | |
71 attrs = [] | |
72 if self._subtracting: | |
73 attrs.append(0) | |
74 if self.bold: | |
75 attrs.append(insults.BOLD) | |
76 if self.underline: | |
77 attrs.append(insults.UNDERLINE) | |
78 if self.blink: | |
79 attrs.append(insults.BLINK) | |
80 if self.reverseVideo: | |
81 attrs.append(insults.REVERSE_VIDEO) | |
82 if self.foreground != WHITE: | |
83 attrs.append(FOREGROUND + self.foreground) | |
84 if self.background != BLACK: | |
85 attrs.append(BACKGROUND + self.background) | |
86 if attrs: | |
87 return '\x1b[' + ';'.join(map(str, attrs)) + 'm' | |
88 return '' | |
89 | |
90 # XXX - need to support scroll regions and scroll history | |
91 class TerminalBuffer(protocol.Protocol): | |
92 """ | |
93 An in-memory terminal emulator. | |
94 """ | |
95 implements(insults.ITerminalTransport) | |
96 | |
97 for keyID in ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW', | |
98 'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN', | |
99 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9', | |
100 'F10', 'F11', 'F12'): | |
101 exec '%s = object()' % (keyID,) | |
102 | |
103 TAB = '\t' | |
104 BACKSPACE = '\x7f' | |
105 | |
106 width = 80 | |
107 height = 24 | |
108 | |
109 fill = ' ' | |
110 void = object() | |
111 | |
112 def getCharacter(self, x, y): | |
113 return self.lines[y][x] | |
114 | |
115 def connectionMade(self): | |
116 self.reset() | |
117 | |
118 def write(self, bytes): | |
119 """ | |
120 Add the given printable bytes to the terminal. | |
121 | |
122 Line feeds in C{bytes} will be replaced with carriage return / line | |
123 feed pairs. | |
124 """ | |
125 for b in bytes.replace('\n', '\r\n'): | |
126 self.insertAtCursor(b) | |
127 | |
128 def _currentCharacterAttributes(self): | |
129 return CharacterAttribute(self.activeCharset, **self.graphicRendition) | |
130 | |
131 def insertAtCursor(self, b): | |
132 """ | |
133 Add one byte to the terminal at the cursor and make consequent state | |
134 updates. | |
135 | |
136 If b is a carriage return, move the cursor to the beginning of the | |
137 current row. | |
138 | |
139 If b is a line feed, move the cursor to the next row or scroll down if | |
140 the cursor is already in the last row. | |
141 | |
142 Otherwise, if b is printable, put it at the cursor position (inserting | |
143 or overwriting as dictated by the current mode) and move the cursor. | |
144 """ | |
145 if b == '\r': | |
146 self.x = 0 | |
147 elif b == '\n': | |
148 self._scrollDown() | |
149 elif b in string.printable: | |
150 if self.x >= self.width: | |
151 self.nextLine() | |
152 ch = (b, self._currentCharacterAttributes()) | |
153 if self.modes.get(insults.modes.IRM): | |
154 self.lines[self.y][self.x:self.x] = [ch] | |
155 self.lines[self.y].pop() | |
156 else: | |
157 self.lines[self.y][self.x] = ch | |
158 self.x += 1 | |
159 | |
160 def _emptyLine(self, width): | |
161 return [(self.void, self._currentCharacterAttributes()) for i in xrange(
width)] | |
162 | |
163 def _scrollDown(self): | |
164 self.y += 1 | |
165 if self.y >= self.height: | |
166 self.y -= 1 | |
167 del self.lines[0] | |
168 self.lines.append(self._emptyLine(self.width)) | |
169 | |
170 def _scrollUp(self): | |
171 self.y -= 1 | |
172 if self.y < 0: | |
173 self.y = 0 | |
174 del self.lines[-1] | |
175 self.lines.insert(0, self._emptyLine(self.width)) | |
176 | |
177 def cursorUp(self, n=1): | |
178 self.y = max(0, self.y - n) | |
179 | |
180 def cursorDown(self, n=1): | |
181 self.y = min(self.height - 1, self.y + n) | |
182 | |
183 def cursorBackward(self, n=1): | |
184 self.x = max(0, self.x - n) | |
185 | |
186 def cursorForward(self, n=1): | |
187 self.x = min(self.width, self.x + n) | |
188 | |
189 def cursorPosition(self, column, line): | |
190 self.x = column | |
191 self.y = line | |
192 | |
193 def cursorHome(self): | |
194 self.x = self.home.x | |
195 self.y = self.home.y | |
196 | |
197 def index(self): | |
198 self._scrollDown() | |
199 | |
200 def reverseIndex(self): | |
201 self._scrollUp() | |
202 | |
203 def nextLine(self): | |
204 """ | |
205 Update the cursor position attributes and scroll down if appropriate. | |
206 """ | |
207 self.x = 0 | |
208 self._scrollDown() | |
209 | |
210 def saveCursor(self): | |
211 self._savedCursor = (self.x, self.y) | |
212 | |
213 def restoreCursor(self): | |
214 self.x, self.y = self._savedCursor | |
215 del self._savedCursor | |
216 | |
217 def setModes(self, modes): | |
218 for m in modes: | |
219 self.modes[m] = True | |
220 | |
221 def resetModes(self, modes): | |
222 for m in modes: | |
223 try: | |
224 del self.modes[m] | |
225 except KeyError: | |
226 pass | |
227 | |
228 | |
229 def setPrivateModes(self, modes): | |
230 """ | |
231 Enable the given modes. | |
232 | |
233 Track which modes have been enabled so that the implementations of | |
234 other L{insults.ITerminalTransport} methods can be properly implemented | |
235 to respect these settings. | |
236 | |
237 @see: L{resetPrivateModes} | |
238 @see: L{insults.ITerminalTransport.setPrivateModes} | |
239 """ | |
240 for m in modes: | |
241 self.privateModes[m] = True | |
242 | |
243 | |
244 def resetPrivateModes(self, modes): | |
245 """ | |
246 Disable the given modes. | |
247 | |
248 @see: L{setPrivateModes} | |
249 @see: L{insults.ITerminalTransport.resetPrivateModes} | |
250 """ | |
251 for m in modes: | |
252 try: | |
253 del self.privateModes[m] | |
254 except KeyError: | |
255 pass | |
256 | |
257 | |
258 def applicationKeypadMode(self): | |
259 self.keypadMode = 'app' | |
260 | |
261 def numericKeypadMode(self): | |
262 self.keypadMode = 'num' | |
263 | |
264 def selectCharacterSet(self, charSet, which): | |
265 self.charsets[which] = charSet | |
266 | |
267 def shiftIn(self): | |
268 self.activeCharset = insults.G0 | |
269 | |
270 def shiftOut(self): | |
271 self.activeCharset = insults.G1 | |
272 | |
273 def singleShift2(self): | |
274 oldActiveCharset = self.activeCharset | |
275 self.activeCharset = insults.G2 | |
276 f = self.insertAtCursor | |
277 def insertAtCursor(b): | |
278 f(b) | |
279 del self.insertAtCursor | |
280 self.activeCharset = oldActiveCharset | |
281 self.insertAtCursor = insertAtCursor | |
282 | |
283 def singleShift3(self): | |
284 oldActiveCharset = self.activeCharset | |
285 self.activeCharset = insults.G3 | |
286 f = self.insertAtCursor | |
287 def insertAtCursor(b): | |
288 f(b) | |
289 del self.insertAtCursor | |
290 self.activeCharset = oldActiveCharset | |
291 self.insertAtCursor = insertAtCursor | |
292 | |
293 def selectGraphicRendition(self, *attributes): | |
294 for a in attributes: | |
295 if a == insults.NORMAL: | |
296 self.graphicRendition = { | |
297 'bold': False, | |
298 'underline': False, | |
299 'blink': False, | |
300 'reverseVideo': False, | |
301 'foreground': WHITE, | |
302 'background': BLACK} | |
303 elif a == insults.BOLD: | |
304 self.graphicRendition['bold'] = True | |
305 elif a == insults.UNDERLINE: | |
306 self.graphicRendition['underline'] = True | |
307 elif a == insults.BLINK: | |
308 self.graphicRendition['blink'] = True | |
309 elif a == insults.REVERSE_VIDEO: | |
310 self.graphicRendition['reverseVideo'] = True | |
311 else: | |
312 try: | |
313 v = int(a) | |
314 except ValueError: | |
315 log.msg("Unknown graphic rendition attribute: " + repr(a)) | |
316 else: | |
317 if FOREGROUND <= v <= FOREGROUND + N_COLORS: | |
318 self.graphicRendition['foreground'] = v - FOREGROUND | |
319 elif BACKGROUND <= v <= BACKGROUND + N_COLORS: | |
320 self.graphicRendition['background'] = v - BACKGROUND | |
321 else: | |
322 log.msg("Unknown graphic rendition attribute: " + repr(a
)) | |
323 | |
324 def eraseLine(self): | |
325 self.lines[self.y] = self._emptyLine(self.width) | |
326 | |
327 def eraseToLineEnd(self): | |
328 width = self.width - self.x | |
329 self.lines[self.y][self.x:] = self._emptyLine(width) | |
330 | |
331 def eraseToLineBeginning(self): | |
332 self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1) | |
333 | |
334 def eraseDisplay(self): | |
335 self.lines = [self._emptyLine(self.width) for i in xrange(self.height)] | |
336 | |
337 def eraseToDisplayEnd(self): | |
338 self.eraseToLineEnd() | |
339 height = self.height - self.y - 1 | |
340 self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(he
ight)] | |
341 | |
342 def eraseToDisplayBeginning(self): | |
343 self.eraseToLineBeginning() | |
344 self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y
)] | |
345 | |
346 def deleteCharacter(self, n=1): | |
347 del self.lines[self.y][self.x:self.x+n] | |
348 self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n))) | |
349 | |
350 def insertLine(self, n=1): | |
351 self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(
n)] | |
352 del self.lines[self.height:] | |
353 | |
354 def deleteLine(self, n=1): | |
355 del self.lines[self.y:self.y+n] | |
356 self.lines.extend([self._emptyLine(self.width) for i in range(n)]) | |
357 | |
358 def reportCursorPosition(self): | |
359 return (self.x, self.y) | |
360 | |
361 def reset(self): | |
362 self.home = insults.Vector(0, 0) | |
363 self.x = self.y = 0 | |
364 self.modes = {} | |
365 self.privateModes = {} | |
366 self.setPrivateModes([insults.privateModes.AUTO_WRAP, | |
367 insults.privateModes.CURSOR_MODE]) | |
368 self.numericKeypad = 'app' | |
369 self.activeCharset = insults.G0 | |
370 self.graphicRendition = { | |
371 'bold': False, | |
372 'underline': False, | |
373 'blink': False, | |
374 'reverseVideo': False, | |
375 'foreground': WHITE, | |
376 'background': BLACK} | |
377 self.charsets = { | |
378 insults.G0: insults.CS_US, | |
379 insults.G1: insults.CS_US, | |
380 insults.G2: insults.CS_ALTERNATE, | |
381 insults.G3: insults.CS_ALTERNATE_SPECIAL} | |
382 self.eraseDisplay() | |
383 | |
384 def unhandledControlSequence(self, buf): | |
385 print 'Could not handle', repr(buf) | |
386 | |
387 def __str__(self): | |
388 lines = [] | |
389 for L in self.lines: | |
390 buf = [] | |
391 length = 0 | |
392 for (ch, attr) in L: | |
393 if ch is not self.void: | |
394 buf.append(ch) | |
395 length = len(buf) | |
396 else: | |
397 buf.append(self.fill) | |
398 lines.append(''.join(buf[:length])) | |
399 return '\n'.join(lines) | |
400 | |
401 class ExpectationTimeout(Exception): | |
402 pass | |
403 | |
404 class ExpectableBuffer(TerminalBuffer): | |
405 _mark = 0 | |
406 | |
407 def connectionMade(self): | |
408 TerminalBuffer.connectionMade(self) | |
409 self._expecting = [] | |
410 | |
411 def write(self, bytes): | |
412 TerminalBuffer.write(self, bytes) | |
413 self._checkExpected() | |
414 | |
415 def cursorHome(self): | |
416 TerminalBuffer.cursorHome(self) | |
417 self._mark = 0 | |
418 | |
419 def _timeoutExpected(self, d): | |
420 d.errback(ExpectationTimeout()) | |
421 self._checkExpected() | |
422 | |
423 def _checkExpected(self): | |
424 s = str(self)[self._mark:] | |
425 while self._expecting: | |
426 expr, timer, deferred = self._expecting[0] | |
427 if timer and not timer.active(): | |
428 del self._expecting[0] | |
429 continue | |
430 for match in expr.finditer(s): | |
431 if timer: | |
432 timer.cancel() | |
433 del self._expecting[0] | |
434 self._mark += match.end() | |
435 s = s[match.end():] | |
436 deferred.callback(match) | |
437 break | |
438 else: | |
439 return | |
440 | |
441 def expect(self, expression, timeout=None, scheduler=reactor): | |
442 d = defer.Deferred() | |
443 timer = None | |
444 if timeout: | |
445 timer = scheduler.callLater(timeout, self._timeoutExpected, d) | |
446 self._expecting.append((re.compile(expression), timer, d)) | |
447 self._checkExpected() | |
448 return d | |
449 | |
450 __all__ = ['CharacterAttribute', 'TerminalBuffer', 'ExpectableBuffer'] | |
OLD | NEW |