Index: third_party/twisted_8_1/twisted/conch/telnet.py |
diff --git a/third_party/twisted_8_1/twisted/conch/telnet.py b/third_party/twisted_8_1/twisted/conch/telnet.py |
deleted file mode 100644 |
index 8cdce9496f179a769b9b181254f76584b9f26d50..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/conch/telnet.py |
+++ /dev/null |
@@ -1,996 +0,0 @@ |
-# -*- test-case-name: twisted.conch.test.test_telnet -*- |
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-""" |
-Telnet protocol implementation. |
- |
-@author: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>} |
-""" |
- |
-import struct |
- |
-from zope.interface import implements |
- |
-from twisted.internet import protocol, interfaces as iinternet, defer |
-from twisted.python import log |
- |
-MODE = chr(1) |
-EDIT = 1 |
-TRAPSIG = 2 |
-MODE_ACK = 4 |
-SOFT_TAB = 8 |
-LIT_ECHO = 16 |
- |
-# Characters gleaned from the various (and conflicting) RFCs. Not all of these are correct. |
- |
-NULL = chr(0) # No operation. |
-BEL = chr(7) # Produces an audible or |
- # visible signal (which does |
- # NOT move the print head). |
-BS = chr(8) # Moves the print head one |
- # character position towards |
- # the left margin. |
-HT = chr(9) # Moves the printer to the |
- # next horizontal tab stop. |
- # It remains unspecified how |
- # either party determines or |
- # establishes where such tab |
- # stops are located. |
-LF = chr(10) # Moves the printer to the |
- # next print line, keeping the |
- # same horizontal position. |
-VT = chr(11) # Moves the printer to the |
- # next vertical tab stop. It |
- # remains unspecified how |
- # either party determines or |
- # establishes where such tab |
- # stops are located. |
-FF = chr(12) # Moves the printer to the top |
- # of the next page, keeping |
- # the same horizontal position. |
-CR = chr(13) # Moves the printer to the left |
- # margin of the current line. |
- |
-ECHO = chr(1) # User-to-Server: Asks the server to send |
- # Echos of the transmitted data. |
-SGA = chr(3) # Suppress Go Ahead. Go Ahead is silly |
- # and most modern servers should suppress |
- # it. |
-NAWS = chr(31) # Negotiate About Window Size. Indicate that |
- # information about the size of the terminal |
- # can be communicated. |
-LINEMODE = chr(34) # Allow line buffering to be |
- # negotiated about. |
- |
-SE = chr(240) # End of subnegotiation parameters. |
-NOP = chr(241) # No operation. |
-DM = chr(242) # "Data Mark": The data stream portion |
- # of a Synch. This should always be |
- # accompanied by a TCP Urgent |
- # notification. |
-BRK = chr(243) # NVT character Break. |
-IP = chr(244) # The function Interrupt Process. |
-AO = chr(245) # The function Abort Output |
-AYT = chr(246) # The function Are You There. |
-EC = chr(247) # The function Erase Character. |
-EL = chr(248) # The function Erase Line |
-GA = chr(249) # The Go Ahead signal. |
-SB = chr(250) # Indicates that what follows is |
- # subnegotiation of the indicated |
- # option. |
-WILL = chr(251) # Indicates the desire to begin |
- # performing, or confirmation that |
- # you are now performing, the |
- # indicated option. |
-WONT = chr(252) # Indicates the refusal to perform, |
- # or continue performing, the |
- # indicated option. |
-DO = chr(253) # Indicates the request that the |
- # other party perform, or |
- # confirmation that you are expecting |
- # the other party to perform, the |
- # indicated option. |
-DONT = chr(254) # Indicates the demand that the |
- # other party stop performing, |
- # or confirmation that you are no |
- # longer expecting the other party |
- # to perform, the indicated option. |
-IAC = chr(255) # Data Byte 255. Introduces a |
- # telnet command. |
- |
-LINEMODE_MODE = chr(1) |
-LINEMODE_EDIT = chr(1) |
-LINEMODE_TRAPSIG = chr(2) |
-LINEMODE_MODE_ACK = chr(4) |
-LINEMODE_SOFT_TAB = chr(8) |
-LINEMODE_LIT_ECHO = chr(16) |
-LINEMODE_FORWARDMASK = chr(2) |
-LINEMODE_SLC = chr(3) |
-LINEMODE_SLC_SYNCH = chr(1) |
-LINEMODE_SLC_BRK = chr(2) |
-LINEMODE_SLC_IP = chr(3) |
-LINEMODE_SLC_AO = chr(4) |
-LINEMODE_SLC_AYT = chr(5) |
-LINEMODE_SLC_EOR = chr(6) |
-LINEMODE_SLC_ABORT = chr(7) |
-LINEMODE_SLC_EOF = chr(8) |
-LINEMODE_SLC_SUSP = chr(9) |
-LINEMODE_SLC_EC = chr(10) |
-LINEMODE_SLC_EL = chr(11) |
- |
-LINEMODE_SLC_EW = chr(12) |
-LINEMODE_SLC_RP = chr(13) |
-LINEMODE_SLC_LNEXT = chr(14) |
-LINEMODE_SLC_XON = chr(15) |
-LINEMODE_SLC_XOFF = chr(16) |
-LINEMODE_SLC_FORW1 = chr(17) |
-LINEMODE_SLC_FORW2 = chr(18) |
-LINEMODE_SLC_MCL = chr(19) |
-LINEMODE_SLC_MCR = chr(20) |
-LINEMODE_SLC_MCWL = chr(21) |
-LINEMODE_SLC_MCWR = chr(22) |
-LINEMODE_SLC_MCBOL = chr(23) |
-LINEMODE_SLC_MCEOL = chr(24) |
-LINEMODE_SLC_INSRT = chr(25) |
-LINEMODE_SLC_OVER = chr(26) |
-LINEMODE_SLC_ECR = chr(27) |
-LINEMODE_SLC_EWR = chr(28) |
-LINEMODE_SLC_EBOL = chr(29) |
-LINEMODE_SLC_EEOL = chr(30) |
- |
-LINEMODE_SLC_DEFAULT = chr(3) |
-LINEMODE_SLC_VALUE = chr(2) |
-LINEMODE_SLC_CANTCHANGE = chr(1) |
-LINEMODE_SLC_NOSUPPORT = chr(0) |
-LINEMODE_SLC_LEVELBITS = chr(3) |
- |
-LINEMODE_SLC_ACK = chr(128) |
-LINEMODE_SLC_FLUSHIN = chr(64) |
-LINEMODE_SLC_FLUSHOUT = chr(32) |
-LINEMODE_EOF = chr(236) |
-LINEMODE_SUSP = chr(237) |
-LINEMODE_ABORT = chr(238) |
- |
-class ITelnetProtocol(iinternet.IProtocol): |
- def unhandledCommand(command, argument): |
- """A command was received but not understood. |
- """ |
- |
- def unhandledSubnegotiation(bytes): |
- """A subnegotiation command was received but not understood. |
- """ |
- |
- def enableLocal(option): |
- """Enable the given option locally. |
- |
- This should enable the given option on this side of the |
- telnet connection and return True. If False is returned, |
- the option will be treated as still disabled and the peer |
- will be notified. |
- """ |
- |
- def enableRemote(option): |
- """Indicate whether the peer should be allowed to enable this option. |
- |
- Returns True if the peer should be allowed to enable this option, |
- False otherwise. |
- """ |
- |
- def disableLocal(option): |
- """Disable the given option locally. |
- |
- Unlike enableLocal, this method cannot fail. The option must be |
- disabled. |
- """ |
- |
- def disableRemote(option): |
- """Indicate that the peer has disabled this option. |
- """ |
- |
-class ITelnetTransport(iinternet.ITransport): |
- def do(option): |
- """Indicate a desire for the peer to begin performing the given option. |
- |
- Returns a Deferred that fires with True when the peer begins performing |
- the option, or False when the peer refuses to perform it. If the peer |
- is already performing the given option, the Deferred will fail with |
- L{AlreadyEnabled}. If a negotiation regarding this option is already |
- in progress, the Deferred will fail with L{AlreadyNegotiating}. |
- |
- Note: It is currently possible that this Deferred will never fire, |
- if the peer never responds, or if the peer believes the option to |
- already be enabled. |
- """ |
- |
- def dont(option): |
- """Indicate a desire for the peer to cease performing the given option. |
- |
- Returns a Deferred that fires with True when the peer ceases performing |
- the option. If the peer is not performing the given option, the |
- Deferred will fail with L{AlreadyDisabled}. If negotiation regarding |
- this option is already in progress, the Deferred will fail with |
- L{AlreadyNegotiating}. |
- |
- Note: It is currently possible that this Deferred will never fire, |
- if the peer never responds, or if the peer believes the option to |
- already be disabled. |
- """ |
- |
- def will(option): |
- """Indicate our willingness to begin performing this option locally. |
- |
- Returns a Deferred that fires with True when the peer agrees to allow |
- us to begin performing this option, or False if the peer refuses to |
- allow us to begin performing it. If the option is already enabled |
- locally, the Deferred will fail with L{AlreadyEnabled}. If negotiation |
- regarding this option is already in progress, the Deferred will fail with |
- L{AlreadyNegotiating}. |
- |
- Note: It is currently possible that this Deferred will never fire, |
- if the peer never responds, or if the peer believes the option to |
- already be enabled. |
- """ |
- |
- def wont(option): |
- """Indicate that we will stop performing the given option. |
- |
- Returns a Deferred that fires with True when the peer acknowledges |
- we have stopped performing this option. If the option is already |
- disabled locally, the Deferred will fail with L{AlreadyDisabled}. |
- If negotiation regarding this option is already in progress, |
- the Deferred will fail with L{AlreadyNegotiating}. |
- |
- Note: It is currently possible that this Deferred will never fire, |
- if the peer never responds, or if the peer believes the option to |
- already be disabled. |
- """ |
- |
- def requestNegotiation(about, bytes): |
- """Send a subnegotiation request. |
- |
- @param about: A byte indicating the feature being negotiated. |
- @param bytes: Any number of bytes containing specific information |
- about the negotiation being requested. No values in this string |
- need to be escaped, as this function will escape any value which |
- requires it. |
- """ |
- |
-class TelnetError(Exception): |
- pass |
- |
-class NegotiationError(TelnetError): |
- def __str__(self): |
- return self.__class__.__module__ + '.' + self.__class__.__name__ + ':' + repr(self.args[0]) |
- |
-class OptionRefused(NegotiationError): |
- pass |
- |
-class AlreadyEnabled(NegotiationError): |
- pass |
- |
-class AlreadyDisabled(NegotiationError): |
- pass |
- |
-class AlreadyNegotiating(NegotiationError): |
- pass |
- |
-class TelnetProtocol(protocol.Protocol): |
- implements(ITelnetProtocol) |
- |
- def unhandledCommand(self, command, argument): |
- pass |
- |
- def unhandledSubnegotiation(self, command, bytes): |
- pass |
- |
- def enableLocal(self, option): |
- pass |
- |
- def enableRemote(self, option): |
- pass |
- |
- def disableLocal(self, option): |
- pass |
- |
- def disableRemote(self, option): |
- pass |
- |
- |
-class Telnet(protocol.Protocol): |
- """ |
- @ivar commandMap: A mapping of bytes to callables. When a |
- telnet command is received, the command byte (the first byte |
- after IAC) is looked up in this dictionary. If a callable is |
- found, it is invoked with the argument of the command, or None |
- if the command takes no argument. Values should be added to |
- this dictionary if commands wish to be handled. By default, |
- only WILL, WONT, DO, and DONT are handled. These should not |
- be overridden, as this class handles them correctly and |
- provides an API for interacting with them. |
- |
- @ivar negotiationMap: A mapping of bytes to callables. When |
- a subnegotiation command is received, the command byte (the |
- first byte after SB) is looked up in this dictionary. If |
- a callable is found, it is invoked with the argument of the |
- subnegotiation. Values should be added to this dictionary if |
- subnegotiations are to be handled. By default, no values are |
- handled. |
- |
- @ivar options: A mapping of option bytes to their current |
- state. This state is likely of little use to user code. |
- Changes should not be made to it. |
- |
- @ivar state: A string indicating the current parse state. It |
- can take on the values "data", "escaped", "command", "newline", |
- "subnegotiation", and "subnegotiation-escaped". Changes |
- should not be made to it. |
- |
- @ivar transport: This protocol's transport object. |
- """ |
- |
- # One of a lot of things |
- state = 'data' |
- |
- def __init__(self): |
- self.options = {} |
- self.negotiationMap = {} |
- self.commandMap = { |
- WILL: self.telnet_WILL, |
- WONT: self.telnet_WONT, |
- DO: self.telnet_DO, |
- DONT: self.telnet_DONT} |
- |
- def _write(self, bytes): |
- self.transport.write(bytes) |
- |
- class _OptionState: |
- class _Perspective: |
- state = 'no' |
- negotiating = False |
- onResult = None |
- |
- def __str__(self): |
- return self.state + ('*' * self.negotiating) |
- |
- def __init__(self): |
- self.us = self._Perspective() |
- self.him = self._Perspective() |
- |
- def __repr__(self): |
- return '<_OptionState us=%s him=%s>' % (self.us, self.him) |
- |
- def getOptionState(self, opt): |
- return self.options.setdefault(opt, self._OptionState()) |
- |
- def _do(self, option): |
- self._write(IAC + DO + option) |
- |
- def _dont(self, option): |
- self._write(IAC + DONT + option) |
- |
- def _will(self, option): |
- self._write(IAC + WILL + option) |
- |
- def _wont(self, option): |
- self._write(IAC + WONT + option) |
- |
- def will(self, option): |
- """Indicate our willingness to enable an option. |
- """ |
- s = self.getOptionState(option) |
- if s.us.negotiating or s.him.negotiating: |
- return defer.fail(AlreadyNegotiating(option)) |
- elif s.us.state == 'yes': |
- return defer.fail(AlreadyEnabled(option)) |
- else: |
- s.us.negotiating = True |
- s.us.onResult = d = defer.Deferred() |
- self._will(option) |
- return d |
- |
- def wont(self, option): |
- """Indicate we are not willing to enable an option. |
- """ |
- s = self.getOptionState(option) |
- if s.us.negotiating or s.him.negotiating: |
- return defer.fail(AlreadyNegotiating(option)) |
- elif s.us.state == 'no': |
- return defer.fail(AlreadyDisabled(option)) |
- else: |
- s.us.negotiating = True |
- s.us.onResult = d = defer.Deferred() |
- self._wont(option) |
- return d |
- |
- def do(self, option): |
- s = self.getOptionState(option) |
- if s.us.negotiating or s.him.negotiating: |
- return defer.fail(AlreadyNegotiating(option)) |
- elif s.him.state == 'yes': |
- return defer.fail(AlreadyEnabled(option)) |
- else: |
- s.him.negotiating = True |
- s.him.onResult = d = defer.Deferred() |
- self._do(option) |
- return d |
- |
- def dont(self, option): |
- s = self.getOptionState(option) |
- if s.us.negotiating or s.him.negotiating: |
- return defer.fail(AlreadyNegotiating(option)) |
- elif s.him.state == 'no': |
- return defer.fail(AlreadyDisabled(option)) |
- else: |
- s.him.negotiating = True |
- s.him.onResult = d = defer.Deferred() |
- self._dont(option) |
- return d |
- |
- def requestNegotiation(self, about, bytes): |
- bytes = bytes.replace(IAC, IAC * 2) |
- self._write(IAC + SB + bytes + IAC + SE) |
- |
- def dataReceived(self, data): |
- appDataBuffer = [] |
- |
- for b in data: |
- if self.state == 'data': |
- if b == IAC: |
- self.state = 'escaped' |
- elif b == '\r': |
- self.state = 'newline' |
- else: |
- appDataBuffer.append(b) |
- elif self.state == 'escaped': |
- if b == IAC: |
- appDataBuffer.append(b) |
- self.state = 'data' |
- elif b == SB: |
- self.state = 'subnegotiation' |
- self.commands = [] |
- elif b in (NOP, DM, BRK, IP, AO, AYT, EC, EL, GA): |
- self.state = 'data' |
- if appDataBuffer: |
- self.applicationDataReceived(''.join(appDataBuffer)) |
- del appDataBuffer[:] |
- self.commandReceived(b, None) |
- elif b in (WILL, WONT, DO, DONT): |
- self.state = 'command' |
- self.command = b |
- else: |
- raise ValueError("Stumped", b) |
- elif self.state == 'command': |
- self.state = 'data' |
- command = self.command |
- del self.command |
- if appDataBuffer: |
- self.applicationDataReceived(''.join(appDataBuffer)) |
- del appDataBuffer[:] |
- self.commandReceived(command, b) |
- elif self.state == 'newline': |
- if b == '\n': |
- appDataBuffer.append('\n') |
- elif b == '\0': |
- appDataBuffer.append('\r') |
- else: |
- appDataBuffer.append('\r' + b) |
- self.state = 'data' |
- elif self.state == 'subnegotiation': |
- if b == IAC: |
- self.state = 'subnegotiation-escaped' |
- else: |
- self.commands.append(b) |
- elif self.state == 'subnegotiation-escaped': |
- if b == SE: |
- self.state = 'data' |
- commands = self.commands |
- del self.commands |
- if appDataBuffer: |
- self.applicationDataReceived(''.join(appDataBuffer)) |
- del appDataBuffer[:] |
- self.negotiate(commands) |
- else: |
- self.state = 'subnegotiation' |
- self.commands.append(b) |
- else: |
- raise ValueError("How'd you do this?") |
- |
- if appDataBuffer: |
- self.applicationDataReceived(''.join(appDataBuffer)) |
- |
- |
- def connectionLost(self, reason): |
- for state in self.options.values(): |
- if state.us.onResult is not None: |
- d = state.us.onResult |
- state.us.onResult = None |
- d.errback(reason) |
- if state.him.onResult is not None: |
- d = state.him.onResult |
- state.him.onResult = None |
- d.errback(reason) |
- |
- def applicationDataReceived(self, bytes): |
- """Called with application-level data. |
- """ |
- |
- def unhandledCommand(self, command, argument): |
- """Called for commands for which no handler is installed. |
- """ |
- |
- def commandReceived(self, command, argument): |
- cmdFunc = self.commandMap.get(command) |
- if cmdFunc is None: |
- self.unhandledCommand(command, argument) |
- else: |
- cmdFunc(argument) |
- |
- def unhandledSubnegotiation(self, command, bytes): |
- """Called for subnegotiations for which no handler is installed. |
- """ |
- |
- def negotiate(self, bytes): |
- command, bytes = bytes[0], bytes[1:] |
- cmdFunc = self.negotiationMap.get(command) |
- if cmdFunc is None: |
- self.unhandledSubnegotiation(command, bytes) |
- else: |
- cmdFunc(bytes) |
- |
- def telnet_WILL(self, option): |
- s = self.getOptionState(option) |
- self.willMap[s.him.state, s.him.negotiating](self, s, option) |
- |
- def will_no_false(self, state, option): |
- # He is unilaterally offering to enable an option. |
- if self.enableRemote(option): |
- state.him.state = 'yes' |
- self._do(option) |
- else: |
- self._dont(option) |
- |
- def will_no_true(self, state, option): |
- # Peer agreed to enable an option in response to our request. |
- state.him.state = 'yes' |
- state.him.negotiating = False |
- d = state.him.onResult |
- state.him.onResult = None |
- d.callback(True) |
- assert self.enableRemote(option), "enableRemote must return True in this context (for option %r)" % (option,) |
- |
- def will_yes_false(self, state, option): |
- # He is unilaterally offering to enable an already-enabled option. |
- # Ignore this. |
- pass |
- |
- def will_yes_true(self, state, option): |
- # This is a bogus state. It is here for completeness. It will |
- # never be entered. |
- assert False, "will_yes_true can never be entered, but was called with %r, %r" % (state, option) |
- |
- willMap = {('no', False): will_no_false, ('no', True): will_no_true, |
- ('yes', False): will_yes_false, ('yes', True): will_yes_true} |
- |
- def telnet_WONT(self, option): |
- s = self.getOptionState(option) |
- self.wontMap[s.him.state, s.him.negotiating](self, s, option) |
- |
- def wont_no_false(self, state, option): |
- # He is unilaterally demanding that an already-disabled option be/remain disabled. |
- # Ignore this (although we could record it and refuse subsequent enable attempts |
- # from our side - he can always refuse them again though, so we won't) |
- pass |
- |
- def wont_no_true(self, state, option): |
- # Peer refused to enable an option in response to our request. |
- state.him.negotiating = False |
- d = state.him.onResult |
- state.him.onResult = None |
- d.errback(OptionRefused(option)) |
- |
- def wont_yes_false(self, state, option): |
- # Peer is unilaterally demanding that an option be disabled. |
- state.him.state = 'no' |
- self.disableRemote(option) |
- self._dont(option) |
- |
- def wont_yes_true(self, state, option): |
- # Peer agreed to disable an option at our request. |
- state.him.state = 'no' |
- state.him.negotiating = False |
- d = state.him.onResult |
- state.him.onResult = None |
- d.callback(True) |
- self.disableRemote(option) |
- |
- wontMap = {('no', False): wont_no_false, ('no', True): wont_no_true, |
- ('yes', False): wont_yes_false, ('yes', True): wont_yes_true} |
- |
- def telnet_DO(self, option): |
- s = self.getOptionState(option) |
- self.doMap[s.us.state, s.us.negotiating](self, s, option) |
- |
- def do_no_false(self, state, option): |
- # Peer is unilaterally requesting that we enable an option. |
- if self.enableLocal(option): |
- state.us.state = 'yes' |
- self._will(option) |
- else: |
- self._wont(option) |
- |
- def do_no_true(self, state, option): |
- # Peer agreed to allow us to enable an option at our request. |
- state.us.state = 'yes' |
- state.us.negotiating = False |
- d = state.us.onResult |
- state.us.onResult = None |
- d.callback(True) |
- self.enableLocal(option) |
- |
- def do_yes_false(self, state, option): |
- # Peer is unilaterally requesting us to enable an already-enabled option. |
- # Ignore this. |
- pass |
- |
- def do_yes_true(self, state, option): |
- # This is a bogus state. It is here for completeness. It will never be |
- # entered. |
- assert False, "do_yes_true can never be entered, but was called with %r, %r" % (state, option) |
- |
- doMap = {('no', False): do_no_false, ('no', True): do_no_true, |
- ('yes', False): do_yes_false, ('yes', True): do_yes_true} |
- |
- def telnet_DONT(self, option): |
- s = self.getOptionState(option) |
- self.dontMap[s.us.state, s.us.negotiating](self, s, option) |
- |
- def dont_no_false(self, state, option): |
- # Peer is unilaterally demanding us to disable an already-disabled option. |
- # Ignore this. |
- pass |
- |
- def dont_no_true(self, state, option): |
- # This is a bogus state. It is here for completeness. It will never be |
- # entered. |
- assert False, "dont_no_true can never be entered, but was called with %r, %r" % (state, option) |
- |
- |
- def dont_yes_false(self, state, option): |
- # Peer is unilaterally demanding we disable an option. |
- state.us.state = 'no' |
- self.disableLocal(option) |
- self._wont(option) |
- |
- def dont_yes_true(self, state, option): |
- # Peer acknowledged our notice that we will disable an option. |
- state.us.state = 'no' |
- state.us.negotiating = False |
- d = state.us.onResult |
- state.us.onResult = None |
- d.callback(True) |
- self.disableLocal(option) |
- |
- dontMap = {('no', False): dont_no_false, ('no', True): dont_no_true, |
- ('yes', False): dont_yes_false, ('yes', True): dont_yes_true} |
- |
- def enableLocal(self, option): |
- """ |
- Reject all attempts to enable options. |
- """ |
- return False |
- |
- |
- def enableRemote(self, option): |
- """ |
- Reject all attempts to enable options. |
- """ |
- return False |
- |
- |
- def disableLocal(self, option): |
- """ |
- Signal a programming error by raising an exception. |
- |
- L{enableLocal} must return true for the given value of C{option} in |
- order for this method to be called. If a subclass of L{Telnet} |
- overrides enableLocal to allow certain options to be enabled, it must |
- also override disableLocal to disable those options. |
- |
- @raise NotImplementedError: Always raised. |
- """ |
- raise NotImplementedError( |
- "Don't know how to disable local telnet option %r" % (option,)) |
- |
- |
- def disableRemote(self, option): |
- """ |
- Signal a programming error by raising an exception. |
- |
- L{enableRemote} must return true for the given value of C{option} in |
- order for this method to be called. If a subclass of L{Telnet} |
- overrides enableRemote to allow certain options to be enabled, it must |
- also override disableRemote tto disable those options. |
- |
- @raise NotImplementedError: Always raised. |
- """ |
- raise NotImplementedError( |
- "Don't know how to disable remote telnet option %r" % (option,)) |
- |
- |
- |
-class ProtocolTransportMixin: |
- def write(self, bytes): |
- self.transport.write(bytes.replace('\n', '\r\n')) |
- |
- def writeSequence(self, seq): |
- self.transport.writeSequence(seq) |
- |
- def loseConnection(self): |
- self.transport.loseConnection() |
- |
- def getHost(self): |
- return self.transport.getHost() |
- |
- def getPeer(self): |
- return self.transport.getPeer() |
- |
-class TelnetTransport(Telnet, ProtocolTransportMixin): |
- """ |
- @ivar protocol: An instance of the protocol to which this |
- transport is connected, or None before the connection is |
- established and after it is lost. |
- |
- @ivar protocolFactory: A callable which returns protocol instances |
- which provide L{ITelnetProtocol}. This will be invoked when a |
- connection is established. It is passed *protocolArgs and |
- **protocolKwArgs. |
- |
- @ivar protocolArgs: A tuple of additional arguments to |
- pass to protocolFactory. |
- |
- @ivar protocolKwArgs: A dictionary of additional arguments |
- to pass to protocolFactory. |
- """ |
- |
- disconnecting = False |
- |
- protocolFactory = None |
- protocol = None |
- |
- def __init__(self, protocolFactory=None, *a, **kw): |
- Telnet.__init__(self) |
- if protocolFactory is not None: |
- self.protocolFactory = protocolFactory |
- self.protocolArgs = a |
- self.protocolKwArgs = kw |
- |
- def connectionMade(self): |
- if self.protocolFactory is not None: |
- self.protocol = self.protocolFactory(*self.protocolArgs, **self.protocolKwArgs) |
- assert ITelnetProtocol.providedBy(self.protocol) |
- try: |
- factory = self.factory |
- except AttributeError: |
- pass |
- else: |
- self.protocol.factory = factory |
- self.protocol.makeConnection(self) |
- |
- def connectionLost(self, reason): |
- Telnet.connectionLost(self, reason) |
- if self.protocol is not None: |
- try: |
- self.protocol.connectionLost(reason) |
- finally: |
- del self.protocol |
- |
- def enableLocal(self, option): |
- return self.protocol.enableLocal(option) |
- |
- def enableRemote(self, option): |
- return self.protocol.enableRemote(option) |
- |
- def disableLocal(self, option): |
- return self.protocol.disableLocal(option) |
- |
- def disableRemote(self, option): |
- return self.protocol.disableRemote(option) |
- |
- def unhandledSubnegotiation(self, command, bytes): |
- self.protocol.unhandledSubnegotiation(command, bytes) |
- |
- def unhandledCommand(self, command, argument): |
- self.protocol.unhandledCommand(command, argument) |
- |
- def applicationDataReceived(self, bytes): |
- self.protocol.dataReceived(bytes) |
- |
- def write(self, data): |
- ProtocolTransportMixin.write(self, data.replace('\xff','\xff\xff')) |
- |
- |
-class TelnetBootstrapProtocol(TelnetProtocol, ProtocolTransportMixin): |
- implements() |
- |
- protocol = None |
- |
- def __init__(self, protocolFactory, *args, **kw): |
- self.protocolFactory = protocolFactory |
- self.protocolArgs = args |
- self.protocolKwArgs = kw |
- |
- def connectionMade(self): |
- self.transport.negotiationMap[NAWS] = self.telnet_NAWS |
- self.transport.negotiationMap[LINEMODE] = self.telnet_LINEMODE |
- |
- for opt in (LINEMODE, NAWS, SGA): |
- self.transport.do(opt).addErrback(log.err) |
- for opt in (ECHO,): |
- self.transport.will(opt).addErrback(log.err) |
- |
- self.protocol = self.protocolFactory(*self.protocolArgs, **self.protocolKwArgs) |
- |
- try: |
- factory = self.factory |
- except AttributeError: |
- pass |
- else: |
- self.protocol.factory = factory |
- |
- self.protocol.makeConnection(self) |
- |
- def connectionLost(self, reason): |
- if self.protocol is not None: |
- try: |
- self.protocol.connectionLost(reason) |
- finally: |
- del self.protocol |
- |
- def dataReceived(self, data): |
- self.protocol.dataReceived(data) |
- |
- def enableLocal(self, opt): |
- if opt == ECHO: |
- return True |
- elif opt == SGA: |
- return True |
- else: |
- return False |
- |
- def enableRemote(self, opt): |
- if opt == LINEMODE: |
- self.transport.requestNegotiation(LINEMODE, MODE + chr(TRAPSIG)) |
- return True |
- elif opt == NAWS: |
- return True |
- elif opt == SGA: |
- return True |
- else: |
- return False |
- |
- def telnet_NAWS(self, bytes): |
- # NAWS is client -> server *only*. self.protocol will |
- # therefore be an ITerminalTransport, the `.protocol' |
- # attribute of which will be an ITerminalProtocol. Maybe. |
- # You know what, XXX TODO clean this up. |
- if len(bytes) == 4: |
- width, height = struct.unpack('!HH', ''.join(bytes)) |
- self.protocol.terminalProtocol.terminalSize(width, height) |
- else: |
- log.msg("Wrong number of NAWS bytes") |
- |
- |
- linemodeSubcommands = { |
- LINEMODE_SLC: 'SLC'} |
- def telnet_LINEMODE(self, bytes): |
- revmap = {} |
- linemodeSubcommand = bytes[0] |
- if 0: |
- # XXX TODO: This should be enabled to parse linemode subnegotiation. |
- getattr(self, 'linemode_' + self.linemodeSubcommands[linemodeSubcommand])(bytes[1:]) |
- |
- def linemode_SLC(self, bytes): |
- chunks = zip(*[iter(bytes)]*3) |
- for slcFunction, slcValue, slcWhat in chunks: |
- # Later, we should parse stuff. |
- 'SLC', ord(slcFunction), ord(slcValue), ord(slcWhat) |
- |
-from twisted.protocols import basic |
- |
-class StatefulTelnetProtocol(basic.LineReceiver, TelnetProtocol): |
- delimiter = '\n' |
- |
- state = 'Discard' |
- |
- def connectionLost(self, reason): |
- basic.LineReceiver.connectionLost(self, reason) |
- TelnetProtocol.connectionLost(self, reason) |
- |
- def lineReceived(self, line): |
- oldState = self.state |
- newState = getattr(self, "telnet_" + oldState)(line) |
- if newState is not None: |
- if self.state == oldState: |
- self.state = newState |
- else: |
- log.msg("Warning: state changed and new state returned") |
- |
- def telnet_Discard(self, line): |
- pass |
- |
-from twisted.cred import credentials |
- |
-class AuthenticatingTelnetProtocol(StatefulTelnetProtocol): |
- """A protocol which prompts for credentials and attempts to authenticate them. |
- |
- Username and password prompts are given (the password is obscured). When the |
- information is collected, it is passed to a portal and an avatar implementing |
- L{ITelnetProtocol} is requested. If an avatar is returned, it connected to this |
- protocol's transport, and this protocol's transport is connected to it. |
- Otherwise, the user is re-prompted for credentials. |
- """ |
- |
- state = "User" |
- protocol = None |
- |
- def __init__(self, portal): |
- self.portal = portal |
- |
- def connectionMade(self): |
- self.transport.write("Username: ") |
- |
- def connectionLost(self, reason): |
- StatefulTelnetProtocol.connectionLost(self, reason) |
- if self.protocol is not None: |
- try: |
- self.protocol.connectionLost(reason) |
- self.logout() |
- finally: |
- del self.protocol, self.logout |
- |
- def telnet_User(self, line): |
- self.username = line |
- self.transport.will(ECHO) |
- self.transport.write("Password: ") |
- return 'Password' |
- |
- def telnet_Password(self, line): |
- username, password = self.username, line |
- del self.username |
- def login(ignored): |
- creds = credentials.UsernamePassword(username, password) |
- d = self.portal.login(creds, None, ITelnetProtocol) |
- d.addCallback(self._cbLogin) |
- d.addErrback(self._ebLogin) |
- self.transport.wont(ECHO).addCallback(login) |
- return 'Discard' |
- |
- def _cbLogin(self, ial): |
- interface, protocol, logout = ial |
- assert interface is ITelnetProtocol |
- self.protocol = protocol |
- self.logout = logout |
- self.state = 'Command' |
- |
- protocol.makeConnection(self.transport) |
- self.transport.protocol = protocol |
- |
- def _ebLogin(self, failure): |
- self.transport.write("\nAuthentication failed\n") |
- self.transport.write("Username: ") |
- self.state = "User" |
- |
-__all__ = [ |
- # Exceptions |
- 'TelnetError', 'NegotiationError', 'OptionRefused', |
- 'AlreadyNegotiating', 'AlreadyEnabled', 'AlreadyDisabled', |
- |
- # Interfaces |
- 'ITelnetProtocol', 'ITelnetTransport', |
- |
- # Other stuff, protocols, etc. |
- 'Telnet', 'TelnetProtocol', 'TelnetTransport', |
- 'TelnetBootstrapProtocol', |
- |
- ] |