| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.conch.test.test_helper -*- | |
| 2 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 3 # See LICENSE for details. | |
| 4 | |
| 5 """ | |
| 6 Partial in-memory terminal emulator | |
| 7 | |
| 8 @author: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>} | |
| 9 """ | |
| 10 | |
| 11 import re, string | |
| 12 | |
| 13 from zope.interface import implements | |
| 14 | |
| 15 from twisted.internet import defer, protocol, reactor | |
| 16 from twisted.python import log | |
| 17 | |
| 18 from twisted.conch.insults import insults | |
| 19 | |
| 20 FOREGROUND = 30 | |
| 21 BACKGROUND = 40 | |
| 22 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9) | |
| 23 | |
| 24 class CharacterAttribute: | |
| 25 """Represents the attributes of a single character. | |
| 26 | |
| 27 Character set, intensity, underlinedness, blinkitude, video | |
| 28 reversal, as well as foreground and background colors made up a | |
| 29 character's attributes. | |
| 30 """ | |
| 31 def __init__(self, charset=insults.G0, | |
| 32 bold=False, underline=False, | |
| 33 blink=False, reverseVideo=False, | |
| 34 foreground=WHITE, background=BLACK, | |
| 35 | |
| 36 _subtracting=False): | |
| 37 self.charset = charset | |
| 38 self.bold = bold | |
| 39 self.underline = underline | |
| 40 self.blink = blink | |
| 41 self.reverseVideo = reverseVideo | |
| 42 self.foreground = foreground | |
| 43 self.background = background | |
| 44 | |
| 45 self._subtracting = _subtracting | |
| 46 | |
| 47 def __eq__(self, other): | |
| 48 return vars(self) == vars(other) | |
| 49 | |
| 50 def __ne__(self, other): | |
| 51 return not self.__eq__(other) | |
| 52 | |
| 53 def copy(self): | |
| 54 c = self.__class__() | |
| 55 c.__dict__.update(vars(self)) | |
| 56 return c | |
| 57 | |
| 58 def wantOne(self, **kw): | |
| 59 k, v = kw.popitem() | |
| 60 if getattr(self, k) != v: | |
| 61 attr = self.copy() | |
| 62 attr._subtracting = not v | |
| 63 setattr(attr, k, v) | |
| 64 return attr | |
| 65 else: | |
| 66 return self.copy() | |
| 67 | |
| 68 def toVT102(self): | |
| 69 # Spit out a vt102 control sequence that will set up | |
| 70 # all the attributes set here. Except charset. | |
| 71 attrs = [] | |
| 72 if self._subtracting: | |
| 73 attrs.append(0) | |
| 74 if self.bold: | |
| 75 attrs.append(insults.BOLD) | |
| 76 if self.underline: | |
| 77 attrs.append(insults.UNDERLINE) | |
| 78 if self.blink: | |
| 79 attrs.append(insults.BLINK) | |
| 80 if self.reverseVideo: | |
| 81 attrs.append(insults.REVERSE_VIDEO) | |
| 82 if self.foreground != WHITE: | |
| 83 attrs.append(FOREGROUND + self.foreground) | |
| 84 if self.background != BLACK: | |
| 85 attrs.append(BACKGROUND + self.background) | |
| 86 if attrs: | |
| 87 return '\x1b[' + ';'.join(map(str, attrs)) + 'm' | |
| 88 return '' | |
| 89 | |
| 90 # XXX - need to support scroll regions and scroll history | |
| 91 class TerminalBuffer(protocol.Protocol): | |
| 92 """ | |
| 93 An in-memory terminal emulator. | |
| 94 """ | |
| 95 implements(insults.ITerminalTransport) | |
| 96 | |
| 97 for keyID in ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW', | |
| 98 'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN', | |
| 99 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9', | |
| 100 'F10', 'F11', 'F12'): | |
| 101 exec '%s = object()' % (keyID,) | |
| 102 | |
| 103 TAB = '\t' | |
| 104 BACKSPACE = '\x7f' | |
| 105 | |
| 106 width = 80 | |
| 107 height = 24 | |
| 108 | |
| 109 fill = ' ' | |
| 110 void = object() | |
| 111 | |
| 112 def getCharacter(self, x, y): | |
| 113 return self.lines[y][x] | |
| 114 | |
| 115 def connectionMade(self): | |
| 116 self.reset() | |
| 117 | |
| 118 def write(self, bytes): | |
| 119 """ | |
| 120 Add the given printable bytes to the terminal. | |
| 121 | |
| 122 Line feeds in C{bytes} will be replaced with carriage return / line | |
| 123 feed pairs. | |
| 124 """ | |
| 125 for b in bytes.replace('\n', '\r\n'): | |
| 126 self.insertAtCursor(b) | |
| 127 | |
| 128 def _currentCharacterAttributes(self): | |
| 129 return CharacterAttribute(self.activeCharset, **self.graphicRendition) | |
| 130 | |
| 131 def insertAtCursor(self, b): | |
| 132 """ | |
| 133 Add one byte to the terminal at the cursor and make consequent state | |
| 134 updates. | |
| 135 | |
| 136 If b is a carriage return, move the cursor to the beginning of the | |
| 137 current row. | |
| 138 | |
| 139 If b is a line feed, move the cursor to the next row or scroll down if | |
| 140 the cursor is already in the last row. | |
| 141 | |
| 142 Otherwise, if b is printable, put it at the cursor position (inserting | |
| 143 or overwriting as dictated by the current mode) and move the cursor. | |
| 144 """ | |
| 145 if b == '\r': | |
| 146 self.x = 0 | |
| 147 elif b == '\n': | |
| 148 self._scrollDown() | |
| 149 elif b in string.printable: | |
| 150 if self.x >= self.width: | |
| 151 self.nextLine() | |
| 152 ch = (b, self._currentCharacterAttributes()) | |
| 153 if self.modes.get(insults.modes.IRM): | |
| 154 self.lines[self.y][self.x:self.x] = [ch] | |
| 155 self.lines[self.y].pop() | |
| 156 else: | |
| 157 self.lines[self.y][self.x] = ch | |
| 158 self.x += 1 | |
| 159 | |
| 160 def _emptyLine(self, width): | |
| 161 return [(self.void, self._currentCharacterAttributes()) for i in xrange(
width)] | |
| 162 | |
| 163 def _scrollDown(self): | |
| 164 self.y += 1 | |
| 165 if self.y >= self.height: | |
| 166 self.y -= 1 | |
| 167 del self.lines[0] | |
| 168 self.lines.append(self._emptyLine(self.width)) | |
| 169 | |
| 170 def _scrollUp(self): | |
| 171 self.y -= 1 | |
| 172 if self.y < 0: | |
| 173 self.y = 0 | |
| 174 del self.lines[-1] | |
| 175 self.lines.insert(0, self._emptyLine(self.width)) | |
| 176 | |
| 177 def cursorUp(self, n=1): | |
| 178 self.y = max(0, self.y - n) | |
| 179 | |
| 180 def cursorDown(self, n=1): | |
| 181 self.y = min(self.height - 1, self.y + n) | |
| 182 | |
| 183 def cursorBackward(self, n=1): | |
| 184 self.x = max(0, self.x - n) | |
| 185 | |
| 186 def cursorForward(self, n=1): | |
| 187 self.x = min(self.width, self.x + n) | |
| 188 | |
| 189 def cursorPosition(self, column, line): | |
| 190 self.x = column | |
| 191 self.y = line | |
| 192 | |
| 193 def cursorHome(self): | |
| 194 self.x = self.home.x | |
| 195 self.y = self.home.y | |
| 196 | |
| 197 def index(self): | |
| 198 self._scrollDown() | |
| 199 | |
| 200 def reverseIndex(self): | |
| 201 self._scrollUp() | |
| 202 | |
| 203 def nextLine(self): | |
| 204 """ | |
| 205 Update the cursor position attributes and scroll down if appropriate. | |
| 206 """ | |
| 207 self.x = 0 | |
| 208 self._scrollDown() | |
| 209 | |
| 210 def saveCursor(self): | |
| 211 self._savedCursor = (self.x, self.y) | |
| 212 | |
| 213 def restoreCursor(self): | |
| 214 self.x, self.y = self._savedCursor | |
| 215 del self._savedCursor | |
| 216 | |
| 217 def setModes(self, modes): | |
| 218 for m in modes: | |
| 219 self.modes[m] = True | |
| 220 | |
| 221 def resetModes(self, modes): | |
| 222 for m in modes: | |
| 223 try: | |
| 224 del self.modes[m] | |
| 225 except KeyError: | |
| 226 pass | |
| 227 | |
| 228 | |
| 229 def setPrivateModes(self, modes): | |
| 230 """ | |
| 231 Enable the given modes. | |
| 232 | |
| 233 Track which modes have been enabled so that the implementations of | |
| 234 other L{insults.ITerminalTransport} methods can be properly implemented | |
| 235 to respect these settings. | |
| 236 | |
| 237 @see: L{resetPrivateModes} | |
| 238 @see: L{insults.ITerminalTransport.setPrivateModes} | |
| 239 """ | |
| 240 for m in modes: | |
| 241 self.privateModes[m] = True | |
| 242 | |
| 243 | |
| 244 def resetPrivateModes(self, modes): | |
| 245 """ | |
| 246 Disable the given modes. | |
| 247 | |
| 248 @see: L{setPrivateModes} | |
| 249 @see: L{insults.ITerminalTransport.resetPrivateModes} | |
| 250 """ | |
| 251 for m in modes: | |
| 252 try: | |
| 253 del self.privateModes[m] | |
| 254 except KeyError: | |
| 255 pass | |
| 256 | |
| 257 | |
| 258 def applicationKeypadMode(self): | |
| 259 self.keypadMode = 'app' | |
| 260 | |
| 261 def numericKeypadMode(self): | |
| 262 self.keypadMode = 'num' | |
| 263 | |
| 264 def selectCharacterSet(self, charSet, which): | |
| 265 self.charsets[which] = charSet | |
| 266 | |
| 267 def shiftIn(self): | |
| 268 self.activeCharset = insults.G0 | |
| 269 | |
| 270 def shiftOut(self): | |
| 271 self.activeCharset = insults.G1 | |
| 272 | |
| 273 def singleShift2(self): | |
| 274 oldActiveCharset = self.activeCharset | |
| 275 self.activeCharset = insults.G2 | |
| 276 f = self.insertAtCursor | |
| 277 def insertAtCursor(b): | |
| 278 f(b) | |
| 279 del self.insertAtCursor | |
| 280 self.activeCharset = oldActiveCharset | |
| 281 self.insertAtCursor = insertAtCursor | |
| 282 | |
| 283 def singleShift3(self): | |
| 284 oldActiveCharset = self.activeCharset | |
| 285 self.activeCharset = insults.G3 | |
| 286 f = self.insertAtCursor | |
| 287 def insertAtCursor(b): | |
| 288 f(b) | |
| 289 del self.insertAtCursor | |
| 290 self.activeCharset = oldActiveCharset | |
| 291 self.insertAtCursor = insertAtCursor | |
| 292 | |
| 293 def selectGraphicRendition(self, *attributes): | |
| 294 for a in attributes: | |
| 295 if a == insults.NORMAL: | |
| 296 self.graphicRendition = { | |
| 297 'bold': False, | |
| 298 'underline': False, | |
| 299 'blink': False, | |
| 300 'reverseVideo': False, | |
| 301 'foreground': WHITE, | |
| 302 'background': BLACK} | |
| 303 elif a == insults.BOLD: | |
| 304 self.graphicRendition['bold'] = True | |
| 305 elif a == insults.UNDERLINE: | |
| 306 self.graphicRendition['underline'] = True | |
| 307 elif a == insults.BLINK: | |
| 308 self.graphicRendition['blink'] = True | |
| 309 elif a == insults.REVERSE_VIDEO: | |
| 310 self.graphicRendition['reverseVideo'] = True | |
| 311 else: | |
| 312 try: | |
| 313 v = int(a) | |
| 314 except ValueError: | |
| 315 log.msg("Unknown graphic rendition attribute: " + repr(a)) | |
| 316 else: | |
| 317 if FOREGROUND <= v <= FOREGROUND + N_COLORS: | |
| 318 self.graphicRendition['foreground'] = v - FOREGROUND | |
| 319 elif BACKGROUND <= v <= BACKGROUND + N_COLORS: | |
| 320 self.graphicRendition['background'] = v - BACKGROUND | |
| 321 else: | |
| 322 log.msg("Unknown graphic rendition attribute: " + repr(a
)) | |
| 323 | |
| 324 def eraseLine(self): | |
| 325 self.lines[self.y] = self._emptyLine(self.width) | |
| 326 | |
| 327 def eraseToLineEnd(self): | |
| 328 width = self.width - self.x | |
| 329 self.lines[self.y][self.x:] = self._emptyLine(width) | |
| 330 | |
| 331 def eraseToLineBeginning(self): | |
| 332 self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1) | |
| 333 | |
| 334 def eraseDisplay(self): | |
| 335 self.lines = [self._emptyLine(self.width) for i in xrange(self.height)] | |
| 336 | |
| 337 def eraseToDisplayEnd(self): | |
| 338 self.eraseToLineEnd() | |
| 339 height = self.height - self.y - 1 | |
| 340 self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(he
ight)] | |
| 341 | |
| 342 def eraseToDisplayBeginning(self): | |
| 343 self.eraseToLineBeginning() | |
| 344 self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y
)] | |
| 345 | |
| 346 def deleteCharacter(self, n=1): | |
| 347 del self.lines[self.y][self.x:self.x+n] | |
| 348 self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n))) | |
| 349 | |
| 350 def insertLine(self, n=1): | |
| 351 self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(
n)] | |
| 352 del self.lines[self.height:] | |
| 353 | |
| 354 def deleteLine(self, n=1): | |
| 355 del self.lines[self.y:self.y+n] | |
| 356 self.lines.extend([self._emptyLine(self.width) for i in range(n)]) | |
| 357 | |
| 358 def reportCursorPosition(self): | |
| 359 return (self.x, self.y) | |
| 360 | |
| 361 def reset(self): | |
| 362 self.home = insults.Vector(0, 0) | |
| 363 self.x = self.y = 0 | |
| 364 self.modes = {} | |
| 365 self.privateModes = {} | |
| 366 self.setPrivateModes([insults.privateModes.AUTO_WRAP, | |
| 367 insults.privateModes.CURSOR_MODE]) | |
| 368 self.numericKeypad = 'app' | |
| 369 self.activeCharset = insults.G0 | |
| 370 self.graphicRendition = { | |
| 371 'bold': False, | |
| 372 'underline': False, | |
| 373 'blink': False, | |
| 374 'reverseVideo': False, | |
| 375 'foreground': WHITE, | |
| 376 'background': BLACK} | |
| 377 self.charsets = { | |
| 378 insults.G0: insults.CS_US, | |
| 379 insults.G1: insults.CS_US, | |
| 380 insults.G2: insults.CS_ALTERNATE, | |
| 381 insults.G3: insults.CS_ALTERNATE_SPECIAL} | |
| 382 self.eraseDisplay() | |
| 383 | |
| 384 def unhandledControlSequence(self, buf): | |
| 385 print 'Could not handle', repr(buf) | |
| 386 | |
| 387 def __str__(self): | |
| 388 lines = [] | |
| 389 for L in self.lines: | |
| 390 buf = [] | |
| 391 length = 0 | |
| 392 for (ch, attr) in L: | |
| 393 if ch is not self.void: | |
| 394 buf.append(ch) | |
| 395 length = len(buf) | |
| 396 else: | |
| 397 buf.append(self.fill) | |
| 398 lines.append(''.join(buf[:length])) | |
| 399 return '\n'.join(lines) | |
| 400 | |
| 401 class ExpectationTimeout(Exception): | |
| 402 pass | |
| 403 | |
| 404 class ExpectableBuffer(TerminalBuffer): | |
| 405 _mark = 0 | |
| 406 | |
| 407 def connectionMade(self): | |
| 408 TerminalBuffer.connectionMade(self) | |
| 409 self._expecting = [] | |
| 410 | |
| 411 def write(self, bytes): | |
| 412 TerminalBuffer.write(self, bytes) | |
| 413 self._checkExpected() | |
| 414 | |
| 415 def cursorHome(self): | |
| 416 TerminalBuffer.cursorHome(self) | |
| 417 self._mark = 0 | |
| 418 | |
| 419 def _timeoutExpected(self, d): | |
| 420 d.errback(ExpectationTimeout()) | |
| 421 self._checkExpected() | |
| 422 | |
| 423 def _checkExpected(self): | |
| 424 s = str(self)[self._mark:] | |
| 425 while self._expecting: | |
| 426 expr, timer, deferred = self._expecting[0] | |
| 427 if timer and not timer.active(): | |
| 428 del self._expecting[0] | |
| 429 continue | |
| 430 for match in expr.finditer(s): | |
| 431 if timer: | |
| 432 timer.cancel() | |
| 433 del self._expecting[0] | |
| 434 self._mark += match.end() | |
| 435 s = s[match.end():] | |
| 436 deferred.callback(match) | |
| 437 break | |
| 438 else: | |
| 439 return | |
| 440 | |
| 441 def expect(self, expression, timeout=None, scheduler=reactor): | |
| 442 d = defer.Deferred() | |
| 443 timer = None | |
| 444 if timeout: | |
| 445 timer = scheduler.callLater(timeout, self._timeoutExpected, d) | |
| 446 self._expecting.append((re.compile(expression), timer, d)) | |
| 447 self._checkExpected() | |
| 448 return d | |
| 449 | |
| 450 __all__ = ['CharacterAttribute', 'TerminalBuffer', 'ExpectableBuffer'] | |
| OLD | NEW |