Index: third_party/twisted_8_1/twisted/conch/test/test_ssh.py |
diff --git a/third_party/twisted_8_1/twisted/conch/test/test_ssh.py b/third_party/twisted_8_1/twisted/conch/test/test_ssh.py |
deleted file mode 100644 |
index 7443e8ef2bb8f7dc19fcb6ded64305b4f0ca4fcf..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/conch/test/test_ssh.py |
+++ /dev/null |
@@ -1,844 +0,0 @@ |
-# -*- test-case-name: twisted.conch.test.test_ssh -*- |
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-try: |
- import Crypto |
-except ImportError: |
- Crypto = None |
- |
-from twisted.conch.ssh import common, session, forwarding |
-from twisted.conch import avatar, error |
-from twisted.conch.test.keydata import publicRSA_openssh, privateRSA_openssh |
-from twisted.conch.test.keydata import publicDSA_openssh, privateDSA_openssh |
-from twisted.cred import portal |
-from twisted.internet import defer, protocol, reactor |
-from twisted.internet.error import ProcessTerminated |
-from twisted.python import failure, log |
-from twisted.trial import unittest |
- |
-from test_recvline import LoopbackRelay |
- |
-import struct |
- |
- |
-class ConchTestRealm: |
- |
- def requestAvatar(self, avatarID, mind, *interfaces): |
- unittest.assertEquals(avatarID, 'testuser') |
- a = ConchTestAvatar() |
- return interfaces[0], a, a.logout |
- |
-class ConchTestAvatar(avatar.ConchUser): |
- loggedOut = False |
- |
- def __init__(self): |
- avatar.ConchUser.__init__(self) |
- self.listeners = {} |
- self.channelLookup.update({'session': session.SSHSession, |
- 'direct-tcpip':forwarding.openConnectForwardingClient}) |
- self.subsystemLookup.update({'crazy': CrazySubsystem}) |
- |
- def global_foo(self, data): |
- unittest.assertEquals(data, 'bar') |
- return 1 |
- |
- def global_foo_2(self, data): |
- unittest.assertEquals(data, 'bar2') |
- return 1, 'data' |
- |
- def global_tcpip_forward(self, data): |
- host, port = forwarding.unpackGlobal_tcpip_forward(data) |
- try: listener = reactor.listenTCP(port, |
- forwarding.SSHListenForwardingFactory(self.conn, |
- (host, port), |
- forwarding.SSHListenServerForwardingChannel), |
- interface = host) |
- except: |
- log.err() |
- unittest.fail("something went wrong with remote->local forwarding") |
- return 0 |
- else: |
- self.listeners[(host, port)] = listener |
- return 1 |
- |
- def global_cancel_tcpip_forward(self, data): |
- host, port = forwarding.unpackGlobal_tcpip_forward(data) |
- listener = self.listeners.get((host, port), None) |
- if not listener: |
- return 0 |
- del self.listeners[(host, port)] |
- listener.stopListening() |
- return 1 |
- |
- def logout(self): |
- loggedOut = True |
- for listener in self.listeners.values(): |
- log.msg('stopListening %s' % listener) |
- listener.stopListening() |
- |
-class ConchSessionForTestAvatar: |
- |
- def __init__(self, avatar): |
- unittest.assert_(isinstance(avatar, ConchTestAvatar)) |
- self.avatar = avatar |
- self.cmd = None |
- self.proto = None |
- self.ptyReq = False |
- self.eof = 0 |
- |
- def getPty(self, term, windowSize, attrs): |
- log.msg('pty req') |
- unittest.assertEquals(term, 'conch-test-term') |
- unittest.assertEquals(windowSize, (24, 80, 0, 0)) |
- self.ptyReq = True |
- |
- def openShell(self, proto): |
- log.msg('openning shell') |
- unittest.assertEquals(self.ptyReq, True) |
- self.proto = proto |
- EchoTransport(proto) |
- self.cmd = 'shell' |
- |
- def execCommand(self, proto, cmd): |
- self.cmd = cmd |
- unittest.assert_(cmd.split()[0] in ['false', 'echo', 'secho', 'eecho','jumboliah'], |
- 'invalid command: %s' % cmd.split()[0]) |
- if cmd == 'jumboliah': |
- raise error.ConchError('bad exec') |
- self.proto = proto |
- f = cmd.split()[0] |
- if f == 'false': |
- FalseTransport(proto) |
- elif f == 'echo': |
- t = EchoTransport(proto) |
- t.write(cmd[5:]) |
- t.loseConnection() |
- elif f == 'secho': |
- t = SuperEchoTransport(proto) |
- t.write(cmd[6:]) |
- t.loseConnection() |
- elif f == 'eecho': |
- t = ErrEchoTransport(proto) |
- t.write(cmd[6:]) |
- t.loseConnection() |
- self.avatar.conn.transport.expectedLoseConnection = 1 |
- |
-# def closeReceived(self): |
-# #if self.proto: |
-# # self.proto.transport.loseConnection() |
-# self.loseConnection() |
- |
- def eofReceived(self): |
- self.eof = 1 |
- |
- def closed(self): |
- log.msg('closed cmd "%s"' % self.cmd) |
- if self.cmd == 'echo hello': |
- rwl = self.proto.session.remoteWindowLeft |
- unittest.assertEquals(rwl, 4) |
- elif self.cmd == 'eecho hello': |
- rwl = self.proto.session.remoteWindowLeft |
- unittest.assertEquals(rwl, 4) |
- elif self.cmd == 'shell': |
- unittest.assert_(self.eof) |
- |
-from twisted.python import components |
-components.registerAdapter(ConchSessionForTestAvatar, ConchTestAvatar, session.ISession) |
- |
-class CrazySubsystem(protocol.Protocol): |
- |
- def __init__(self, *args, **kw): |
- pass |
- |
- def connectionMade(self): |
- """ |
- good ... good |
- """ |
- |
-class FalseTransport: |
- |
- def __init__(self, p): |
- p.makeConnection(self) |
- p.processEnded(failure.Failure(ProcessTerminated(255, None, None))) |
- |
- def loseConnection(self): |
- pass |
- |
-class EchoTransport: |
- |
- def __init__(self, p): |
- self.proto = p |
- p.makeConnection(self) |
- self.closed = 0 |
- |
- def write(self, data): |
- log.msg(repr(data)) |
- self.proto.outReceived(data) |
- self.proto.outReceived('\r\n') |
- if '\x00' in data: # mimic 'exit' for the shell test |
- self.loseConnection() |
- |
- def loseConnection(self): |
- if self.closed: return |
- self.closed = 1 |
- self.proto.inConnectionLost() |
- self.proto.outConnectionLost() |
- self.proto.errConnectionLost() |
- self.proto.processEnded(failure.Failure(ProcessTerminated(0, None, None))) |
- |
-class ErrEchoTransport: |
- |
- def __init__(self, p): |
- self.proto = p |
- p.makeConnection(self) |
- self.closed = 0 |
- |
- def write(self, data): |
- self.proto.errReceived(data) |
- self.proto.errReceived('\r\n') |
- |
- def loseConnection(self): |
- if self.closed: return |
- self.closed = 1 |
- self.proto.inConnectionLost() |
- self.proto.outConnectionLost() |
- self.proto.errConnectionLost() |
- self.proto.processEnded(failure.Failure(ProcessTerminated(0, None, None))) |
- |
-class SuperEchoTransport: |
- |
- def __init__(self, p): |
- self.proto = p |
- p.makeConnection(self) |
- self.closed = 0 |
- |
- def write(self, data): |
- self.proto.outReceived(data) |
- self.proto.outReceived('\r\n') |
- self.proto.errReceived(data) |
- self.proto.errReceived('\r\n') |
- |
- def loseConnection(self): |
- if self.closed: return |
- self.closed = 1 |
- self.proto.inConnectionLost() |
- self.proto.outConnectionLost() |
- self.proto.errConnectionLost() |
- self.proto.processEnded(failure.Failure(ProcessTerminated(0, None, None))) |
- |
- |
-if Crypto: # stuff that needs PyCrypto to even import |
- from twisted.conch import checkers |
- from twisted.conch.ssh import channel, connection, factory, keys |
- from twisted.conch.ssh import transport, userauth |
- |
- class UtilityTestCase(unittest.TestCase): |
- def testCounter(self): |
- c = transport._Counter('\x00\x00', 2) |
- for i in xrange(256 * 256): |
- self.assertEquals(c(), struct.pack('!H', (i + 1) % (2 ** 16))) |
- # It should wrap around, too. |
- for i in xrange(256 * 256): |
- self.assertEquals(c(), struct.pack('!H', (i + 1) % (2 ** 16))) |
- |
- |
- class ConchTestPublicKeyChecker(checkers.SSHPublicKeyDatabase): |
- def checkKey(self, credentials): |
- unittest.assertEquals(credentials.username, 'testuser', 'bad username') |
- unittest.assertEquals(credentials.blob, keys.getPublicKeyString(data=publicDSA_openssh)) |
- return 1 |
- |
- class ConchTestPasswordChecker: |
- credentialInterfaces = checkers.IUsernamePassword, |
- |
- def requestAvatarId(self, credentials): |
- unittest.assertEquals(credentials.username, 'testuser', 'bad username') |
- unittest.assertEquals(credentials.password, 'testpass', 'bad password') |
- return defer.succeed(credentials.username) |
- |
- class ConchTestSSHChecker(checkers.SSHProtocolChecker): |
- |
- def areDone(self, avatarId): |
- unittest.assertEquals(avatarId, 'testuser') |
- if len(self.successfulCredentials[avatarId]) < 2: |
- return 0 |
- else: |
- return 1 |
- |
- class ConchTestServerFactory(factory.SSHFactory): |
- noisy = 0 |
- |
- services = { |
- 'ssh-userauth':userauth.SSHUserAuthServer, |
- 'ssh-connection':connection.SSHConnection |
- } |
- |
- def buildProtocol(self, addr): |
- proto = ConchTestServer() |
- proto.supportedPublicKeys = self.privateKeys.keys() |
- proto.factory = self |
- |
- if hasattr(self, 'expectedLoseConnection'): |
- proto.expectedLoseConnection = self.expectedLoseConnection |
- |
- self.proto = proto |
- return proto |
- |
- def getPublicKeys(self): |
- return { |
- 'ssh-rsa':keys.getPublicKeyString(data=publicRSA_openssh), |
- 'ssh-dss':keys.getPublicKeyString(data=publicDSA_openssh) |
- } |
- |
- def getPrivateKeys(self): |
- return { |
- 'ssh-rsa':keys.getPrivateKeyObject(data=privateRSA_openssh), |
- 'ssh-dss':keys.getPrivateKeyObject(data=privateDSA_openssh) |
- } |
- |
- def getPrimes(self): |
- return { |
- 2048:[(transport.DH_GENERATOR, transport.DH_PRIME)] |
- } |
- |
- def getService(self, trans, name): |
- return factory.SSHFactory.getService(self, trans, name) |
- |
- class ConchTestBase: |
- |
- done = 0 |
- allowedToError = 0 |
- |
- def connectionLost(self, reason): |
- if self.done: |
- return |
- if not hasattr(self,'expectedLoseConnection'): |
- unittest.fail('unexpectedly lost connection %s\n%s' % (self, reason)) |
- self.done = 1 |
- |
- def receiveError(self, reasonCode, desc): |
- self.expectedLoseConnection = 1 |
- if not self.allowedToError: |
- unittest.fail('got disconnect for %s: reason %s, desc: %s' % |
- (self, reasonCode, desc)) |
- self.loseConnection() |
- |
- def receiveUnimplemented(self, seqID): |
- unittest.fail('got unimplemented: seqid %s' % seqID) |
- self.expectedLoseConnection = 1 |
- self.loseConnection() |
- |
- class ConchTestServer(ConchTestBase, transport.SSHServerTransport): |
- |
- def connectionLost(self, reason): |
- ConchTestBase.connectionLost(self, reason) |
- transport.SSHServerTransport.connectionLost(self, reason) |
- |
- class ConchTestClient(ConchTestBase, transport.SSHClientTransport): |
- |
- def connectionLost(self, reason): |
- ConchTestBase.connectionLost(self, reason) |
- transport.SSHClientTransport.connectionLost(self, reason) |
- |
- def verifyHostKey(self, key, fp): |
- unittest.assertEquals(key, keys.getPublicKeyString(data = publicRSA_openssh)) |
- unittest.assertEquals(fp,'3d:13:5f:cb:c9:79:8a:93:06:27:65:bc:3d:0b:8f:af') |
- return defer.succeed(1) |
- |
- def connectionSecure(self): |
- self.requestService(ConchTestClientAuth('testuser', |
- ConchTestClientConnection())) |
- |
- class ConchTestClientAuth(userauth.SSHUserAuthClient): |
- |
- hasTriedNone = 0 # have we tried the 'none' auth yet? |
- canSucceedPublicKey = 0 # can we succed with this yet? |
- canSucceedPassword = 0 |
- |
- def ssh_USERAUTH_SUCCESS(self, packet): |
- if not self.canSucceedPassword and self.canSucceedPublicKey: |
- unittest.fail('got USERAUTH_SUCESS before password and publickey') |
- userauth.SSHUserAuthClient.ssh_USERAUTH_SUCCESS(self, packet) |
- |
- def getPassword(self): |
- self.canSucceedPassword = 1 |
- return defer.succeed('testpass') |
- |
- def getPrivateKey(self): |
- self.canSucceedPublicKey = 1 |
- return defer.succeed(keys.getPrivateKeyObject(data=privateDSA_openssh)) |
- |
- def getPublicKey(self): |
- return keys.getPublicKeyString(data=publicDSA_openssh) |
- |
- class ConchTestClientConnection(connection.SSHConnection): |
- |
- name = 'ssh-connection' |
- results = 0 |
- totalResults = 8 |
- |
- def serviceStarted(self): |
- self.openChannel(SSHTestFailExecChannel(conn = self)) |
- self.openChannel(SSHTestFalseChannel(conn = self)) |
- self.openChannel(SSHTestEchoChannel(localWindow=4, localMaxPacket=5, conn = self)) |
- self.openChannel(SSHTestErrChannel(localWindow=4, localMaxPacket=5, conn = self)) |
- self.openChannel(SSHTestMaxPacketChannel(localWindow=12, localMaxPacket=1, conn = self)) |
- self.openChannel(SSHTestShellChannel(conn = self)) |
- self.openChannel(SSHTestSubsystemChannel(conn = self)) |
- self.openChannel(SSHUnknownChannel(conn = self)) |
- |
- def addResult(self): |
- self.results += 1 |
- log.msg('got %s of %s results' % (self.results, self.totalResults)) |
- if self.results == self.totalResults: |
- self.transport.expectedLoseConnection = 1 |
- self.serviceStopped() |
- |
- class SSHUnknownChannel(channel.SSHChannel): |
- |
- name = 'crazy-unknown-channel' |
- |
- def openFailed(self, reason): |
- """ |
- good .... good |
- """ |
- log.msg('unknown open failed') |
- log.flushErrors() |
- self.conn.addResult() |
- |
- def channelOpen(self, ignored): |
- unittest.fail("opened unknown channel") |
- |
- class SSHTestFailExecChannel(channel.SSHChannel): |
- |
- name = 'session' |
- |
- def openFailed(self, reason): |
- unittest.fail('fail exec open failed: %s' % reason) |
- |
- def channelOpen(self, ignore): |
- d = self.conn.sendRequest(self, 'exec', common.NS('jumboliah'), 1) |
- d.addCallback(self._cbRequestWorked) |
- d.addErrback(self._ebRequestWorked) |
- log.msg('opened fail exec') |
- |
- def _cbRequestWorked(self, ignored): |
- unittest.fail('fail exec succeeded') |
- |
- def _ebRequestWorked(self, ignored): |
- log.msg('fail exec finished') |
- log.flushErrors() |
- self.conn.addResult() |
- self.loseConnection() |
- |
- class SSHTestFalseChannel(channel.SSHChannel): |
- |
- name = 'session' |
- |
- def openFailed(self, reason): |
- unittest.fail('false open failed: %s' % reason) |
- |
- def channelOpen(self, ignored): |
- d = self.conn.sendRequest(self, 'exec', common.NS('false'), 1) |
- d.addCallback(self._cbRequestWorked) |
- d.addErrback(self._ebRequestFailed) |
- log.msg('opened false') |
- |
- def _cbRequestWorked(self, ignored): |
- pass |
- |
- def _ebRequestFailed(self, reason): |
- unittest.fail('false exec failed: %s' % reason) |
- |
- def dataReceived(self, data): |
- unittest.fail('got data when using false') |
- |
- def request_exit_status(self, status): |
- status, = struct.unpack('>L', status) |
- if status == 0: |
- unittest.fail('false exit status was 0') |
- log.msg('finished false') |
- self.conn.addResult() |
- return 1 |
- |
- class SSHTestEchoChannel(channel.SSHChannel): |
- |
- name = 'session' |
- testBuf = '' |
- eofCalled = 0 |
- |
- def openFailed(self, reason): |
- unittest.fail('echo open failed: %s' % reason) |
- |
- def channelOpen(self, ignore): |
- d = self.conn.sendRequest(self, 'exec', common.NS('echo hello'), 1) |
- d.addErrback(self._ebRequestFailed) |
- log.msg('opened echo') |
- |
- def _ebRequestFailed(self, reason): |
- unittest.fail('echo exec failed: %s' % reason) |
- |
- def dataReceived(self, data): |
- self.testBuf += data |
- |
- def errReceived(self, dataType, data): |
- unittest.fail('echo channel got extended data') |
- |
- def request_exit_status(self, status): |
- self.status ,= struct.unpack('>L', status) |
- |
- def eofReceived(self): |
- log.msg('eof received') |
- self.eofCalled = 1 |
- |
- def closed(self): |
- if self.status != 0: |
- unittest.fail('echo exit status was not 0: %i' % self.status) |
- if self.testBuf != "hello\r\n": |
- unittest.fail('echo did not return hello: %s' % repr(self.testBuf)) |
- unittest.assertEquals(self.localWindowLeft, 4) |
- unittest.assert_(self.eofCalled) |
- log.msg('finished echo') |
- self.conn.addResult() |
- return 1 |
- |
- class SSHTestErrChannel(channel.SSHChannel): |
- |
- name = 'session' |
- testBuf = '' |
- eofCalled = 0 |
- |
- def openFailed(self, reason): |
- unittest.fail('err open failed: %s' % reason) |
- |
- def channelOpen(self, ignore): |
- d = self.conn.sendRequest(self, 'exec', common.NS('eecho hello'), 1) |
- d.addErrback(self._ebRequestFailed) |
- log.msg('opened err') |
- |
- def _ebRequestFailed(self, reason): |
- unittest.fail('err exec failed: %s' % reason) |
- |
- def dataReceived(self, data): |
- unittest.fail('err channel got regular data: %s' % repr(data)) |
- |
- def extReceived(self, dataType, data): |
- unittest.assertEquals(dataType, connection.EXTENDED_DATA_STDERR) |
- self.testBuf += data |
- |
- def request_exit_status(self, status): |
- self.status ,= struct.unpack('>L', status) |
- |
- def eofReceived(self): |
- log.msg('eof received') |
- self.eofCalled = 1 |
- |
- def closed(self): |
- if self.status != 0: |
- unittest.fail('err exit status was not 0: %i' % self.status) |
- if self.testBuf != "hello\r\n": |
- unittest.fail('err did not return hello: %s' % repr(self.testBuf)) |
- unittest.assertEquals(self.localWindowLeft, 4) |
- unittest.assert_(self.eofCalled) |
- log.msg('finished err') |
- self.conn.addResult() |
- return 1 |
- |
- class SSHTestMaxPacketChannel(channel.SSHChannel): |
- |
- name = 'session' |
- testBuf = '' |
- testExtBuf = '' |
- eofCalled = 0 |
- |
- def openFailed(self, reason): |
- unittest.fail('max packet open failed: %s' % reason) |
- |
- def channelOpen(self, ignore): |
- d = self.conn.sendRequest(self, 'exec', common.NS('secho hello'), 1) |
- d.addErrback(self._ebRequestFailed) |
- log.msg('opened max packet') |
- |
- def _ebRequestFailed(self, reason): |
- unittest.fail('max packet exec failed: %s' % reason) |
- |
- def dataReceived(self, data): |
- self.testBuf += data |
- |
- def extReceived(self, dataType, data): |
- unittest.assertEquals(dataType, connection.EXTENDED_DATA_STDERR) |
- self.testExtBuf += data |
- |
- def request_exit_status(self, status): |
- self.status ,= struct.unpack('>L', status) |
- |
- def eofReceived(self): |
- log.msg('eof received') |
- self.eofCalled = 1 |
- |
- def closed(self): |
- if self.status != 0: |
- unittest.fail('echo exit status was not 0: %i' % self.status) |
- unittest.assertEquals(self.testBuf, 'hello\r\n') |
- unittest.assertEquals(self.testExtBuf, 'hello\r\n') |
- unittest.assertEquals(self.localWindowLeft, 12) |
- unittest.assert_(self.eofCalled) |
- log.msg('finished max packet') |
- self.conn.addResult() |
- return 1 |
- |
- class SSHTestShellChannel(channel.SSHChannel): |
- |
- name = 'session' |
- testBuf = '' |
- eofCalled = 0 |
- closeCalled = 0 |
- |
- def openFailed(self, reason): |
- unittest.fail('shell open failed: %s' % reason) |
- |
- def channelOpen(self, ignored): |
- data = session.packRequest_pty_req('conch-test-term', (24, 80, 0, 0), '') |
- d = self.conn.sendRequest(self, 'pty-req', data, 1) |
- d.addCallback(self._cbPtyReq) |
- d.addErrback(self._ebPtyReq) |
- log.msg('opened shell') |
- |
- def _cbPtyReq(self, ignored): |
- d = self.conn.sendRequest(self, 'shell', '', 1) |
- d.addCallback(self._cbShellOpen) |
- d.addErrback(self._ebShellOpen) |
- |
- def _ebPtyReq(self, reason): |
- unittest.fail('pty request failed: %s' % reason) |
- |
- def _cbShellOpen(self, ignored): |
- self.write('testing the shell!\x00') |
- self.conn.sendEOF(self) |
- |
- def _ebShellOpen(self, reason): |
- unittest.fail('shell request failed: %s' % reason) |
- |
- def dataReceived(self, data): |
- self.testBuf += data |
- |
- def request_exit_status(self, status): |
- self.status ,= struct.unpack('>L', status) |
- |
- def eofReceived(self): |
- self.eofCalled = 1 |
- |
- def closed(self): |
- log.msg('calling shell closed') |
- if self.status != 0: |
- log.msg('shell exit status was not 0: %i' % self.status) |
- unittest.assertEquals(self.testBuf, 'testing the shell!\x00\r\n') |
- unittest.assert_(self.eofCalled) |
- log.msg('finished shell') |
- self.conn.addResult() |
- |
- class SSHTestSubsystemChannel(channel.SSHChannel): |
- |
- name = 'session' |
- |
- def openFailed(self, reason): |
- unittest.fail('subsystem open failed: %s' % reason) |
- |
- def channelOpen(self, ignore): |
- d = self.conn.sendRequest(self, 'subsystem', common.NS('not-crazy'), 1) |
- d.addCallback(self._cbRequestWorked) |
- d.addErrback(self._ebRequestFailed) |
- |
- |
- def _cbRequestWorked(self, ignored): |
- unittest.fail('opened non-crazy subsystem') |
- |
- def _ebRequestFailed(self, ignored): |
- d = self.conn.sendRequest(self, 'subsystem', common.NS('crazy'), 1) |
- d.addCallback(self._cbRealRequestWorked) |
- d.addErrback(self._ebRealRequestFailed) |
- |
- def _cbRealRequestWorked(self, ignored): |
- d1 = self.conn.sendGlobalRequest('foo', 'bar', 1) |
- d1.addErrback(self._ebFirstGlobal) |
- |
- d2 = self.conn.sendGlobalRequest('foo-2', 'bar2', 1) |
- d2.addCallback(lambda x: unittest.assertEquals(x, 'data')) |
- d2.addErrback(self._ebSecondGlobal) |
- |
- d3 = self.conn.sendGlobalRequest('bar', 'foo', 1) |
- d3.addCallback(self._cbThirdGlobal) |
- d3.addErrback(lambda x,s=self: log.msg('subsystem finished') or s.conn.addResult() or s.loseConnection()) |
- |
- def _ebRealRequestFailed(self, reason): |
- unittest.fail('opening crazy subsystem failed: %s' % reason) |
- |
- def _ebFirstGlobal(self, reason): |
- unittest.fail('first global request failed: %s' % reason) |
- |
- def _ebSecondGlobal(self, reason): |
- unittest.fail('second global request failed: %s' % reason) |
- |
- def _cbThirdGlobal(self, ignored): |
- unittest.fail('second global request succeeded') |
- |
- |
- |
-class SSHProtocolTestCase(unittest.TestCase): |
- |
- if not Crypto: |
- skip = "can't run w/o PyCrypto" |
- |
- def testOurServerOurClient(self): |
- """test the Conch server against the Conch client |
- """ |
- realm = ConchTestRealm() |
- p = portal.Portal(realm) |
- sshpc = ConchTestSSHChecker() |
- sshpc.registerChecker(ConchTestPasswordChecker()) |
- sshpc.registerChecker(ConchTestPublicKeyChecker()) |
- p.registerChecker(sshpc) |
- fac = ConchTestServerFactory() |
- fac.portal = p |
- fac.startFactory() |
- self.server = fac.buildProtocol(None) |
- self.clientTransport = LoopbackRelay(self.server) |
- self.client = ConchTestClient() |
- self.serverTransport = LoopbackRelay(self.client) |
- |
- self.server.makeConnection(self.serverTransport) |
- self.client.makeConnection(self.clientTransport) |
- |
- while self.serverTransport.buffer or self.clientTransport.buffer: |
- log.callWithContext({'system': 'serverTransport'}, |
- self.serverTransport.clearBuffer) |
- log.callWithContext({'system': 'clientTransport'}, |
- self.clientTransport.clearBuffer) |
- self.failIf(self.server.done and self.client.done) |
- |
- |
-class TestSSHFactory(unittest.TestCase): |
- |
- if not Crypto: |
- skip = "can't run w/o PyCrypto" |
- |
- def testMultipleFactories(self): |
- f1 = factory.SSHFactory() |
- f2 = factory.SSHFactory() |
- gpk = lambda: {'ssh-rsa' : keys.Key(None)} |
- f1.getPrimes = lambda: None |
- f2.getPrimes = lambda: {1:(2,3)} |
- f1.getPublicKeys = f2.getPublicKeys = gpk |
- f1.getPrivateKeys = f2.getPrivateKeys = gpk |
- f1.startFactory() |
- f2.startFactory() |
- p1 = f1.buildProtocol(None) |
- p2 = f2.buildProtocol(None) |
- self.failIf('diffie-hellman-group-exchange-sha1' in p1.supportedKeyExchanges, |
- p1.supportedKeyExchanges) |
- self.failUnless('diffie-hellman-group-exchange-sha1' in p2.supportedKeyExchanges, |
- p2.supportedKeyExchanges) |
- |
- |
-class EntropyTestCase(unittest.TestCase): |
- """ |
- Tests for L{common.entropy}. |
- """ |
- |
- def test_deprecation(self): |
- """ |
- Test the deprecation of L{common.entropy.get_bytes}. |
- """ |
- def wrapper(): |
- return common.entropy.get_bytes(10) |
- self.assertWarns(DeprecationWarning, |
- "entropy.get_bytes is deprecated, please use " |
- "twisted.python.randbytes.secureRandom instead.", |
- __file__, wrapper) |
- |
- |
- |
-class MPTestCase(unittest.TestCase): |
- """ |
- Tests for L{common.getMP}. |
- |
- @cvar getMP: a method providing a MP parser. |
- @type getMP: C{callable} |
- """ |
- getMP = staticmethod(common.getMP) |
- |
- if not Crypto: |
- skip = "can't run w/o PyCrypto" |
- |
- |
- def test_getMP(self): |
- """ |
- L{common.getMP} should parse the a multiple precision integer from a |
- string: a 4-byte length followed by length bytes of the integer. |
- """ |
- self.assertEquals( |
- self.getMP('\x00\x00\x00\x04\x00\x00\x00\x01'), |
- (1, '')) |
- |
- |
- def test_getMPBigInteger(self): |
- """ |
- L{common.getMP} should be able to parse a big enough integer |
- (that doesn't fit on one byte). |
- """ |
- self.assertEquals( |
- self.getMP('\x00\x00\x00\x04\x01\x02\x03\x04'), |
- (16909060, '')) |
- |
- |
- def test_multipleGetMP(self): |
- """ |
- L{common.getMP} has the ability to parse multiple integer in the same |
- string. |
- """ |
- self.assertEquals( |
- self.getMP('\x00\x00\x00\x04\x00\x00\x00\x01' |
- '\x00\x00\x00\x04\x00\x00\x00\x02', 2), |
- (1, 2, '')) |
- |
- |
- def test_getMPRemainingData(self): |
- """ |
- When more data than needed is sent to L{common.getMP}, it should return |
- the remaining data. |
- """ |
- self.assertEquals( |
- self.getMP('\x00\x00\x00\x04\x00\x00\x00\x01foo'), |
- (1, 'foo')) |
- |
- |
- def test_notEnoughData(self): |
- """ |
- When the string passed to L{common.getMP} doesn't even make 5 bytes, |
- it should raise a L{struct.error}. |
- """ |
- self.assertRaises(struct.error, self.getMP, '\x02\x00') |
- |
- |
- |
-class PyMPTestCase(MPTestCase): |
- """ |
- Tests for the python implementation of L{common.getMP}. |
- """ |
- getMP = staticmethod(common.getMP_py) |
- |
- |
- |
-class GMPYMPTestCase(MPTestCase): |
- """ |
- Tests for the gmpy implementation of L{common.getMP}. |
- """ |
- getMP = staticmethod(common._fastgetMP) |
- |
- |
- |
-try: |
- import gmpy |
-except ImportError: |
- GMPYMPTestCase.skip = "gmpy not available" |