Index: third_party/twisted_8_1/twisted/conch/test/test_transport.py |
diff --git a/third_party/twisted_8_1/twisted/conch/test/test_transport.py b/third_party/twisted_8_1/twisted/conch/test/test_transport.py |
deleted file mode 100644 |
index 2749adfb15db00e3040da1e51f4bd270b8a53d2d..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/conch/test/test_transport.py |
+++ /dev/null |
@@ -1,1930 +0,0 @@ |
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-""" |
-Tests for ssh/transport.py and the classes therein. |
-""" |
- |
-import md5, sha |
- |
-try: |
- import Crypto |
-except ImportError: |
- Crypto = None |
- class transport: # fictional modules to make classes work |
- class SSHTransportBase: pass |
- class SSHServerTransport: pass |
- class SSHClientTransport: pass |
- class factory: |
- class SSHFactory: |
- pass |
-else: |
- from twisted.conch.ssh import transport, common, keys, factory |
- from twisted.conch.test import keydata |
- |
-from twisted.trial import unittest |
-from twisted.internet import defer |
-from twisted.protocols import loopback |
-from twisted.python import randbytes |
-from twisted.python.reflect import qual |
-from twisted.conch.ssh import service |
-from twisted.test import proto_helpers |
- |
-from twisted.conch.error import ConchError |
- |
- |
- |
-class MockTransportBase(transport.SSHTransportBase): |
- """ |
- A base class for the client and server protocols. Stores the messages |
- it receieves instead of ignoring them. |
- |
- @ivar errors: a list of tuples: (reasonCode, description) |
- @ivar unimplementeds: a list of integers: sequence number |
- @ivar debugs: a list of tuples: (alwaysDisplay, message, lang) |
- @ivar ignoreds: a list of strings: ignored data |
- """ |
- |
- |
- def connectionMade(self): |
- """ |
- Set up instance variables. |
- """ |
- transport.SSHTransportBase.connectionMade(self) |
- self.errors = [] |
- self.unimplementeds = [] |
- self.debugs = [] |
- self.ignoreds = [] |
- |
- |
- def receiveError(self, reasonCode, description): |
- """ |
- Store any errors received. |
- |
- @type reasonCode: C{int} |
- @type description: C{str} |
- """ |
- self.errors.append((reasonCode, description)) |
- |
- |
- def receiveUnimplemented(self, seqnum): |
- """ |
- Store any unimplemented packet messages. |
- |
- @type seqnum: C{int} |
- """ |
- self.unimplementeds.append(seqnum) |
- |
- |
- def receiveDebug(self, alwaysDisplay, message, lang): |
- """ |
- Store any debug messages. |
- |
- @type alwaysDisplay: C{bool} |
- @type message: C{str} |
- @type lang: C{str} |
- """ |
- self.debugs.append((alwaysDisplay, message, lang)) |
- |
- |
- def ssh_IGNORE(self, packet): |
- """ |
- Store any ignored data. |
- |
- @type packet: C{str} |
- """ |
- self.ignoreds.append(packet) |
- |
- |
-class MockCipher(object): |
- """ |
- A mocked-up version of twisted.conch.ssh.transport.SSHCiphers. |
- """ |
- outCipType = 'test' |
- encBlockSize = 6 |
- inCipType = 'test' |
- decBlockSize = 6 |
- inMACType = 'test' |
- outMACType = 'test' |
- verifyDigestSize = 1 |
- usedEncrypt = False |
- usedDecrypt = False |
- outMAC = (None, '', '', 1) |
- inMAC = (None, '', '', 1) |
- keys = () |
- |
- |
- def encrypt(self, x): |
- """ |
- Called to encrypt the packet. Simply record that encryption was used |
- and return the data unchanged. |
- """ |
- self.usedEncrypt = True |
- if (len(x) % self.encBlockSize) != 0: |
- raise RuntimeError("length %i modulo blocksize %i is not 0: %i" % |
- (len(x), self.encBlockSize, len(x) % self.encBlockSize)) |
- return x |
- |
- |
- def decrypt(self, x): |
- """ |
- Called to decrypt the packet. Simply record that decryption was used |
- and return the data unchanged. |
- """ |
- self.usedDecrypt = True |
- if (len(x) % self.encBlockSize) != 0: |
- raise RuntimeError("length %i modulo blocksize %i is not 0: %i" % |
- (len(x), self.decBlockSize, len(x) % self.decBlockSize)) |
- return x |
- |
- |
- def makeMAC(self, outgoingPacketSequence, payload): |
- """ |
- Make a Message Authentication Code by sending the character value of |
- the outgoing packet. |
- """ |
- return chr(outgoingPacketSequence) |
- |
- |
- def verify(self, incomingPacketSequence, packet, macData): |
- """ |
- Verify the Message Authentication Code by checking that the packet |
- sequence number is the same. |
- """ |
- return chr(incomingPacketSequence) == macData |
- |
- |
- def setKeys(self, ivOut, keyOut, ivIn, keyIn, macIn, macOut): |
- """ |
- Record the keys. |
- """ |
- self.keys = (ivOut, keyOut, ivIn, keyIn, macIn, macOut) |
- |
- |
- |
-class MockCompression: |
- """ |
- A mocked-up compression, based on the zlib interface. Instead of |
- compressing, it reverses the data and adds a 0x66 byte to the end. |
- """ |
- |
- |
- def compress(self, payload): |
- return payload[::-1] # reversed |
- |
- |
- def decompress(self, payload): |
- return payload[:-1][::-1] |
- |
- |
- def flush(self, kind): |
- return '\x66' |
- |
- |
- |
-class MockService(service.SSHService): |
- """ |
- A mocked-up service, based on twisted.conch.ssh.service.SSHService. |
- |
- @ivar started: True if this service has been started. |
- @ivar stopped: True if this service has been stopped. |
- """ |
- name = "MockService" |
- started = False |
- stopped = False |
- protocolMessages = {0xff: "MSG_TEST", 71: "MSG_fiction"} |
- |
- |
- def logPrefix(self): |
- return "MockService" |
- |
- |
- def serviceStarted(self): |
- """ |
- Record that the service was started. |
- """ |
- self.started = True |
- |
- |
- def serviceStopped(self): |
- """ |
- Record that the service was stopped. |
- """ |
- self.stopped = True |
- |
- |
- def ssh_TEST(self, packet): |
- """ |
- A message that this service responds to. |
- """ |
- self.transport.sendPacket(0xff, packet) |
- |
- |
-class MockFactory(factory.SSHFactory): |
- """ |
- A mocked-up factory based on twisted.conch.ssh.factory.SSHFactory. |
- """ |
- services = { |
- 'ssh-userauth': MockService} |
- |
- |
- def getPublicKeys(self): |
- """ |
- Return the public keys that authenticate this server. |
- """ |
- return { |
- 'ssh-rsa': keys.Key.fromString(keydata.publicRSA_openssh), |
- 'ssh-dsa': keys.Key.fromString(keydata.publicDSA_openssh)} |
- |
- |
- def getPrivateKeys(self): |
- """ |
- Return the private keys that authenticate this server. |
- """ |
- return { |
- 'ssh-rsa': keys.Key.fromString(keydata.privateRSA_openssh), |
- 'ssh-dsa': keys.Key.fromString(keydata.privateDSA_openssh)} |
- |
- |
- def getPrimes(self): |
- """ |
- Return the Diffie-Hellman primes that can be used for the |
- diffie-hellman-group-exchange-sha1 key exchange. |
- """ |
- return { |
- 1024: ((2, transport.DH_PRIME),), |
- 2048: ((3, transport.DH_PRIME),), |
- 4096: ((5, 7),)} |
- |
- |
- |
-class MockOldFactoryPublicKeys(MockFactory): |
- """ |
- The old SSHFactory returned mappings from key names to strings from |
- getPublicKeys(). We return those here for testing. |
- """ |
- |
- |
- def getPublicKeys(self): |
- """ |
- We used to map key types to public key blobs as strings. |
- """ |
- keys = MockFactory.getPublicKeys(self) |
- for name, key in keys.items()[:]: |
- keys[name] = key.blob() |
- return keys |
- |
- |
- |
-class MockOldFactoryPrivateKeys(MockFactory): |
- """ |
- The old SSHFactory returned mappings from key names to PyCrypto key |
- objects from getPrivateKeys(). We return those here for testing. |
- """ |
- |
- |
- def getPrivateKeys(self): |
- """ |
- We used to map key types to PyCrypto key objects. |
- """ |
- keys = MockFactory.getPrivateKeys(self) |
- for name, key in keys.items()[:]: |
- keys[name] = key.keyObject |
- return keys |
- |
- |
- |
-class TransportTestCase(unittest.TestCase): |
- """ |
- Base class for transport test cases. |
- """ |
- klass = None |
- |
- if Crypto is None: |
- skip = "cannot run w/o PyCrypto" |
- |
- |
- def setUp(self): |
- self.transport = proto_helpers.StringTransport() |
- self.proto = self.klass() |
- self.packets = [] |
- def secureRandom(len): |
- """ |
- Return a consistent entropy value |
- """ |
- return '\x99' * len |
- self.oldSecureRandom = randbytes.secureRandom |
- randbytes.secureRandom = secureRandom |
- def stubSendPacket(messageType, payload): |
- self.packets.append((messageType, payload)) |
- self.proto.makeConnection(self.transport) |
- # we just let the kex packet go into the transport |
- self.proto.sendPacket = stubSendPacket |
- |
- |
- def tearDown(self): |
- randbytes.secureRandom = self.oldSecureRandom |
- self.oldSecureRandom = None |
- |
- |
- |
-class BaseSSHTransportTestCase(TransportTestCase): |
- """ |
- Test TransportBase. It implements the non-server/client specific |
- parts of the SSH transport protocol. |
- """ |
- |
- klass = MockTransportBase |
- |
- |
- def test_sendVersion(self): |
- """ |
- Test that the first thing sent over the connection is the version |
- string. |
- """ |
- # the other setup was done in the setup method |
- self.assertEquals(self.transport.value().split('\r\n', 1)[0], |
- "SSH-2.0-Twisted") |
- |
- |
- def test_sendPacketPlain(self): |
- """ |
- Test that plain (unencrypted, uncompressed) packets are sent |
- correctly. The format is:: |
- uint32 length (including type and padding length) |
- byte padding length |
- byte type |
- bytes[length-padding length-2] data |
- bytes[padding length] padding |
- """ |
- proto = MockTransportBase() |
- proto.makeConnection(self.transport) |
- self.transport.clear() |
- message = ord('A') |
- payload = 'BCDEFG' |
- proto.sendPacket(message, payload) |
- value = self.transport.value() |
- self.assertEquals(value, '\x00\x00\x00\x0c\x04ABCDEFG\x99\x99\x99\x99') |
- |
- |
- def test_sendPacketEncrypted(self): |
- """ |
- Test that packets sent while encryption is enabled are sent |
- correctly. The whole packet should be encrypted. |
- """ |
- proto = MockTransportBase() |
- proto.makeConnection(self.transport) |
- proto.currentEncryptions = testCipher = MockCipher() |
- message = ord('A') |
- payload = 'BC' |
- self.transport.clear() |
- proto.sendPacket(message, payload) |
- self.assertTrue(testCipher.usedEncrypt) |
- value = self.transport.value() |
- self.assertEquals(value, '\x00\x00\x00\x08\x04ABC\x99\x99\x99\x99\x01') |
- |
- |
- def test_sendPacketCompressed(self): |
- """ |
- Test that packets sent while compression is enabled are sent |
- correctly. The packet type and data should be encrypted. |
- """ |
- proto = MockTransportBase() |
- proto.makeConnection(self.transport) |
- proto.outgoingCompression = MockCompression() |
- self.transport.clear() |
- proto.sendPacket(ord('A'), 'B') |
- value = self.transport.value() |
- self.assertEquals( |
- value, |
- '\x00\x00\x00\x0c\x08BA\x66\x99\x99\x99\x99\x99\x99\x99\x99') |
- |
- |
- def test_sendPacketBoth(self): |
- """ |
- Test that packets sent while compression and encryption are |
- enabled are sent correctly. The packet type and data should be |
- compressed and then the whole packet should be encrypted. |
- """ |
- proto = MockTransportBase() |
- proto.makeConnection(self.transport) |
- proto.currentEncryptions = testCipher = MockCipher() |
- proto.outgoingCompression = MockCompression() |
- message = ord('A') |
- payload = 'BC' |
- self.transport.clear() |
- proto.sendPacket(message, payload) |
- value = self.transport.value() |
- self.assertEquals( |
- value, |
- '\x00\x00\x00\x0e\x09CBA\x66\x99\x99\x99\x99\x99\x99\x99\x99\x99' |
- '\x01') |
- |
- |
- def test_getPacketPlain(self): |
- """ |
- Test that packets are retrieved correctly out of the buffer when |
- no encryption is enabled. |
- """ |
- proto = MockTransportBase() |
- proto.makeConnection(self.transport) |
- self.transport.clear() |
- proto.sendPacket(ord('A'), 'BC') |
- proto.buf = self.transport.value() + 'extra' |
- self.assertEquals(proto.getPacket(), 'ABC') |
- self.assertEquals(proto.buf, 'extra') |
- |
- |
- def test_getPacketEncrypted(self): |
- """ |
- Test that encrypted packets are retrieved correctly. |
- See test_sendPacketEncrypted. |
- """ |
- proto = MockTransportBase() |
- proto.sendKexInit = lambda: None # don't send packets |
- proto.makeConnection(self.transport) |
- self.transport.clear() |
- proto.currentEncryptions = testCipher = MockCipher() |
- proto.sendPacket(ord('A'), 'BCD') |
- value = self.transport.value() |
- proto.buf = value[:MockCipher.decBlockSize] |
- self.assertEquals(proto.getPacket(), None) |
- self.assertTrue(testCipher.usedDecrypt) |
- self.assertEquals(proto.first, '\x00\x00\x00\x0e\x09A') |
- proto.buf += value[MockCipher.decBlockSize:] |
- self.assertEquals(proto.getPacket(), 'ABCD') |
- self.assertEquals(proto.buf, '') |
- |
- |
- def test_getPacketCompressed(self): |
- """ |
- Test that compressed packets are retrieved correctly. See |
- test_sendPacketCompressed. |
- """ |
- proto = MockTransportBase() |
- proto.makeConnection(self.transport) |
- self.transport.clear() |
- proto.outgoingCompression = MockCompression() |
- proto.incomingCompression = proto.outgoingCompression |
- proto.sendPacket(ord('A'), 'BCD') |
- proto.buf = self.transport.value() |
- self.assertEquals(proto.getPacket(), 'ABCD') |
- |
- |
- def test_getPacketBoth(self): |
- """ |
- Test that compressed and encrypted packets are retrieved correctly. |
- See test_sendPacketBoth. |
- """ |
- proto = MockTransportBase() |
- proto.sendKexInit = lambda: None |
- proto.makeConnection(self.transport) |
- self.transport.clear() |
- proto.currentEncryptions = testCipher = MockCipher() |
- proto.outgoingCompression = MockCompression() |
- proto.incomingCompression = proto.outgoingCompression |
- proto.sendPacket(ord('A'), 'BCDEFG') |
- proto.buf = self.transport.value() |
- self.assertEquals(proto.getPacket(), 'ABCDEFG') |
- |
- |
- def test_ciphersAreValid(self): |
- """ |
- Test that all the supportedCiphers are valid. |
- """ |
- ciphers = transport.SSHCiphers('A', 'B', 'C', 'D') |
- iv = key = '\x00' * 16 |
- for cipName in self.proto.supportedCiphers: |
- self.assertTrue(ciphers._getCipher(cipName, iv, key)) |
- |
- |
- def test_sendKexInit(self): |
- """ |
- Test that the KEXINIT (key exchange initiation) message is sent |
- correctly. Payload:: |
- bytes[16] cookie |
- string key exchange algorithms |
- string public key algorithms |
- string outgoing ciphers |
- string incoming ciphers |
- string outgoing MACs |
- string incoming MACs |
- string outgoing compressions |
- string incoming compressions |
- bool first packet follows |
- uint32 0 |
- """ |
- value = self.transport.value().split('\r\n', 1)[1] |
- self.proto.buf = value |
- packet = self.proto.getPacket() |
- self.assertEquals(packet[0], chr(transport.MSG_KEXINIT)) |
- self.assertEquals(packet[1:17], '\x99' * 16) |
- (kex, pubkeys, ciphers1, ciphers2, macs1, macs2, compressions1, |
- compressions2, languages1, languages2, |
- buf) = common.getNS(packet[17:], 10) |
- |
- self.assertEquals(kex, ','.join(self.proto.supportedKeyExchanges)) |
- self.assertEquals(pubkeys, ','.join(self.proto.supportedPublicKeys)) |
- self.assertEquals(ciphers1, ','.join(self.proto.supportedCiphers)) |
- self.assertEquals(ciphers2, ','.join(self.proto.supportedCiphers)) |
- self.assertEquals(macs1, ','.join(self.proto.supportedMACs)) |
- self.assertEquals(macs2, ','.join(self.proto.supportedMACs)) |
- self.assertEquals(compressions1, |
- ','.join(self.proto.supportedCompressions)) |
- self.assertEquals(compressions2, |
- ','.join(self.proto.supportedCompressions)) |
- self.assertEquals(languages1, ','.join(self.proto.supportedLanguages)) |
- self.assertEquals(languages2, ','.join(self.proto.supportedLanguages)) |
- self.assertEquals(buf, '\x00' * 5) |
- |
- |
- def test_sendDebug(self): |
- """ |
- Test that debug messages are sent correctly. Payload:: |
- bool always display |
- string debug message |
- string language |
- """ |
- self.proto.sendDebug("test", True, 'en') |
- self.assertEquals( |
- self.packets, |
- [(transport.MSG_DEBUG, |
- "\x01\x00\x00\x00\x04test\x00\x00\x00\x02en")]) |
- |
- |
- def test_receiveDebug(self): |
- """ |
- Test that debug messages are received correctly. See test_sendDebug. |
- """ |
- self.proto.dispatchMessage( |
- transport.MSG_DEBUG, |
- '\x01\x00\x00\x00\x04test\x00\x00\x00\x02en') |
- self.assertEquals(self.proto.debugs, [(True, 'test', 'en')]) |
- |
- |
- def test_sendIgnore(self): |
- """ |
- Test that ignored messages are sent correctly. Payload:: |
- string ignored data |
- """ |
- self.proto.sendIgnore("test") |
- self.assertEquals( |
- self.packets, [(transport.MSG_IGNORE, |
- '\x00\x00\x00\x04test')]) |
- |
- |
- def test_receiveIgnore(self): |
- """ |
- Test that ignored messages are received correctly. See |
- test_sendIgnore. |
- """ |
- self.proto.dispatchMessage(transport.MSG_IGNORE, 'test') |
- self.assertEquals(self.proto.ignoreds, ['test']) |
- |
- |
- def test_sendUnimplemented(self): |
- """ |
- Test that unimplemented messages are sent correctly. Payload:: |
- uint32 sequence number |
- """ |
- self.proto.sendUnimplemented() |
- self.assertEquals( |
- self.packets, [(transport.MSG_UNIMPLEMENTED, |
- '\x00\x00\x00\x00')]) |
- |
- |
- def test_receiveUnimplemented(self): |
- """ |
- Test that unimplemented messages are received correctly. See |
- test_sendUnimplemented. |
- """ |
- self.proto.dispatchMessage(transport.MSG_UNIMPLEMENTED, |
- '\x00\x00\x00\xff') |
- self.assertEquals(self.proto.unimplementeds, [255]) |
- |
- |
- def test_sendDisconnect(self): |
- """ |
- Test that disconnection messages are sent correctly. Payload:: |
- uint32 reason code |
- string reason description |
- string language |
- """ |
- disconnected = [False] |
- def stubLoseConnection(): |
- disconnected[0] = True |
- self.transport.loseConnection = stubLoseConnection |
- self.proto.sendDisconnect(0xff, "test") |
- self.assertEquals( |
- self.packets, |
- [(transport.MSG_DISCONNECT, |
- "\x00\x00\x00\xff\x00\x00\x00\x04test\x00\x00\x00\x00")]) |
- self.assertTrue(disconnected[0]) |
- |
- |
- def test_receiveDisconnect(self): |
- """ |
- Test that disconnection messages are received correctly. See |
- test_sendDisconnect. |
- """ |
- disconnected = [False] |
- def stubLoseConnection(): |
- disconnected[0] = True |
- self.transport.loseConnection = stubLoseConnection |
- self.proto.dispatchMessage(transport.MSG_DISCONNECT, |
- '\x00\x00\x00\xff\x00\x00\x00\x04test') |
- self.assertEquals(self.proto.errors, [(255, 'test')]) |
- self.assertTrue(disconnected[0]) |
- |
- |
- def test_dataReceived(self): |
- """ |
- Test that dataReceived parses packets and dispatches them to |
- ssh_* methods. |
- """ |
- kexInit = [False] |
- def stubKEXINIT(packet): |
- kexInit[0] = True |
- self.proto.ssh_KEXINIT = stubKEXINIT |
- self.proto.dataReceived(self.transport.value()) |
- self.assertTrue(self.proto.gotVersion) |
- self.assertEquals(self.proto.ourVersionString, |
- self.proto.otherVersionString) |
- self.assertTrue(kexInit[0]) |
- |
- |
- def test_service(self): |
- """ |
- Test that the transport can set the running service and dispatches |
- packets to the service's packetReceived method. |
- """ |
- service = MockService() |
- self.proto.setService(service) |
- self.assertEquals(self.proto.service, service) |
- self.assertTrue(service.started) |
- self.proto.dispatchMessage(0xff, "test") |
- self.assertEquals(self.packets, [(0xff, "test")]) |
- |
- service2 = MockService() |
- self.proto.setService(service2) |
- self.assertTrue(service2.started) |
- self.assertTrue(service.stopped) |
- |
- self.proto.connectionLost(None) |
- self.assertTrue(service2.stopped) |
- |
- |
- def test_avatar(self): |
- """ |
- Test that the transport notifies the avatar of disconnections. |
- """ |
- disconnected = [False] |
- def logout(): |
- disconnected[0] = True |
- self.proto.logoutFunction = logout |
- self.proto.avatar = True |
- |
- self.proto.connectionLost(None) |
- self.assertTrue(disconnected[0]) |
- |
- |
- def test_isEncrypted(self): |
- """ |
- Test that the transport accurately reflects its encrypted status. |
- """ |
- self.assertFalse(self.proto.isEncrypted('in')) |
- self.assertFalse(self.proto.isEncrypted('out')) |
- self.assertFalse(self.proto.isEncrypted('both')) |
- self.proto.currentEncryptions = MockCipher() |
- self.assertTrue(self.proto.isEncrypted('in')) |
- self.assertTrue(self.proto.isEncrypted('out')) |
- self.assertTrue(self.proto.isEncrypted('both')) |
- self.proto.currentEncryptions = transport.SSHCiphers('none', 'none', |
- 'none', 'none') |
- self.assertFalse(self.proto.isEncrypted('in')) |
- self.assertFalse(self.proto.isEncrypted('out')) |
- self.assertFalse(self.proto.isEncrypted('both')) |
- |
- self.assertRaises(TypeError, self.proto.isEncrypted, 'bad') |
- |
- |
- def test_isVerified(self): |
- """ |
- Test that the transport accurately reflects its verified status. |
- """ |
- self.assertFalse(self.proto.isVerified('in')) |
- self.assertFalse(self.proto.isVerified('out')) |
- self.assertFalse(self.proto.isVerified('both')) |
- self.proto.currentEncryptions = MockCipher() |
- self.assertTrue(self.proto.isVerified('in')) |
- self.assertTrue(self.proto.isVerified('out')) |
- self.assertTrue(self.proto.isVerified('both')) |
- self.proto.currentEncryptions = transport.SSHCiphers('none', 'none', |
- 'none', 'none') |
- self.assertFalse(self.proto.isVerified('in')) |
- self.assertFalse(self.proto.isVerified('out')) |
- self.assertFalse(self.proto.isVerified('both')) |
- |
- self.assertRaises(TypeError, self.proto.isVerified, 'bad') |
- |
- |
- def test_loseConnection(self): |
- """ |
- Test that loseConnection sends a disconnect message and closes the |
- connection. |
- """ |
- disconnected = [False] |
- def stubLoseConnection(): |
- disconnected[0] = True |
- self.transport.loseConnection = stubLoseConnection |
- self.proto.loseConnection() |
- self.assertEquals(self.packets[0][0], transport.MSG_DISCONNECT) |
- self.assertEquals(self.packets[0][1][3], |
- chr(transport.DISCONNECT_CONNECTION_LOST)) |
- |
- |
- def test_badVersion(self): |
- """ |
- Test that the transport disconnects when it receives a bad version. |
- """ |
- def testBad(version): |
- self.packets = [] |
- self.proto.gotVersion = False |
- disconnected = [False] |
- def stubLoseConnection(): |
- disconnected[0] = True |
- self.transport.loseConnection = stubLoseConnection |
- for c in version + '\r\n': |
- self.proto.dataReceived(c) |
- self.assertTrue(disconnected[0]) |
- self.assertEquals(self.packets[0][0], transport.MSG_DISCONNECT) |
- self.assertEquals( |
- self.packets[0][1][3], |
- chr(transport.DISCONNECT_PROTOCOL_VERSION_NOT_SUPPORTED)) |
- testBad('SSH-1.5-OpenSSH') |
- testBad('SSH-3.0-Twisted') |
- testBad('GET / HTTP/1.1') |
- |
- |
- def test_dataBeforeVersion(self): |
- """ |
- Test that the transport ignores data sent before the version string. |
- """ |
- proto = MockTransportBase() |
- proto.makeConnection(proto_helpers.StringTransport()) |
- data = ("""here's some stuff beforehand |
-here's some other stuff |
-""" + proto.ourVersionString + "\r\n") |
- [proto.dataReceived(c) for c in data] |
- self.assertTrue(proto.gotVersion) |
- self.assertEquals(proto.otherVersionString, proto.ourVersionString) |
- |
- |
- def test_compatabilityVersion(self): |
- """ |
- Test that the transport treats the compatbility version (1.99) |
- as equivalent to version 2.0. |
- """ |
- proto = MockTransportBase() |
- proto.makeConnection(proto_helpers.StringTransport()) |
- proto.dataReceived("SSH-1.99-OpenSSH\n") |
- self.assertTrue(proto.gotVersion) |
- self.assertEquals(proto.otherVersionString, "SSH-1.99-OpenSSH") |
- |
- |
- def test_badPackets(self): |
- """ |
- Test that the transport disconnects with an error when it receives |
- bad packets. |
- """ |
- def testBad(packet, error=transport.DISCONNECT_PROTOCOL_ERROR): |
- self.packets = [] |
- self.proto.buf = packet |
- self.assertEquals(self.proto.getPacket(), None) |
- self.assertEquals(len(self.packets), 1) |
- self.assertEquals(self.packets[0][0], transport.MSG_DISCONNECT) |
- self.assertEquals(self.packets[0][1][3], chr(error)) |
- |
- testBad('\xff' * 8) # big packet |
- testBad('\x00\x00\x00\x05\x00BCDE') # length not modulo blocksize |
- oldEncryptions = self.proto.currentEncryptions |
- self.proto.currentEncryptions = MockCipher() |
- testBad('\x00\x00\x00\x08\x06AB123456', # bad MAC |
- transport.DISCONNECT_MAC_ERROR) |
- self.proto.currentEncryptions.decrypt = lambda x: x[:-1] |
- testBad('\x00\x00\x00\x08\x06BCDEFGHIJK') # bad decryption |
- self.proto.currentEncryptions = oldEncryptions |
- self.proto.incomingCompression = MockCompression() |
- def stubDecompress(payload): |
- raise Exception('bad compression') |
- self.proto.incomingCompression.decompress = stubDecompress |
- testBad('\x00\x00\x00\x04\x00BCDE', # bad decompression |
- transport.DISCONNECT_COMPRESSION_ERROR) |
- self.flushLoggedErrors() |
- |
- |
- def test_unimplementedPackets(self): |
- """ |
- Test that unimplemented packet types cause MSG_UNIMPLEMENTED packets |
- to be sent. |
- """ |
- seqnum = self.proto.incomingPacketSequence |
- def checkUnimplemented(seqnum=seqnum): |
- self.assertEquals(self.packets[0][0], |
- transport.MSG_UNIMPLEMENTED) |
- self.assertEquals(self.packets[0][1][3], chr(seqnum)) |
- self.proto.packets = [] |
- seqnum += 1 |
- |
- self.proto.dispatchMessage(40, '') |
- checkUnimplemented() |
- transport.messages[41] = 'MSG_fiction' |
- self.proto.dispatchMessage(41, '') |
- checkUnimplemented() |
- self.proto.dispatchMessage(60, '') |
- checkUnimplemented() |
- self.proto.setService(MockService()) |
- self.proto.dispatchMessage(70, '') |
- checkUnimplemented() |
- self.proto.dispatchMessage(71, '') |
- checkUnimplemented() |
- |
- |
- def test_getKey(self): |
- """ |
- Test that _getKey generates the correct keys. |
- """ |
- self.proto.sessionID = 'EF' |
- |
- k1 = sha.new('AB' + 'CD' |
- + 'K' + self.proto.sessionID).digest() |
- k2 = sha.new('ABCD' + k1).digest() |
- self.assertEquals(self.proto._getKey('K', 'AB', 'CD'), k1 + k2) |
- |
- |
- def test_multipleClasses(self): |
- """ |
- Test that multiple instances have distinct states. |
- """ |
- proto = self.proto |
- proto.dataReceived(self.transport.value()) |
- proto.currentEncryptions = MockCipher() |
- proto.outgoingCompression = MockCompression() |
- proto.incomingCompression = MockCompression() |
- proto.setService(MockService()) |
- proto2 = MockTransportBase() |
- proto2.makeConnection(proto_helpers.StringTransport()) |
- proto2.sendIgnore('') |
- self.failIfEquals(proto.gotVersion, proto2.gotVersion) |
- self.failIfEquals(proto.transport, proto2.transport) |
- self.failIfEquals(proto.outgoingPacketSequence, |
- proto2.outgoingPacketSequence) |
- self.failIfEquals(proto.incomingPacketSequence, |
- proto2.incomingPacketSequence) |
- self.failIfEquals(proto.currentEncryptions, |
- proto2.currentEncryptions) |
- self.failIfEquals(proto.service, proto2.service) |
- |
- |
- |
-class ServerAndClientSSHTransportBaseCase: |
- """ |
- Tests that need to be run on both the server and the client. |
- """ |
- |
- |
- def checkDisconnected(self, kind=None): |
- """ |
- Helper function to check if the transport disconnected. |
- """ |
- if kind is None: |
- kind = transport.DISCONNECT_PROTOCOL_ERROR |
- self.assertEquals(self.packets[-1][0], transport.MSG_DISCONNECT) |
- self.assertEquals(self.packets[-1][1][3], chr(kind)) |
- |
- |
- def connectModifiedProtocol(self, protoModification, |
- kind=None): |
- """ |
- Helper function to connect a modified protocol to the test protocol |
- and test for disconnection. |
- """ |
- if kind is None: |
- kind = transport.DISCONNECT_KEY_EXCHANGE_FAILED |
- proto2 = self.klass() |
- protoModification(proto2) |
- proto2.makeConnection(proto_helpers.StringTransport()) |
- self.proto.dataReceived(proto2.transport.value()) |
- if kind: |
- self.checkDisconnected(kind) |
- return proto2 |
- |
- |
- def test_disconnectIfCantMatchKex(self): |
- """ |
- Test that the transport disconnects if it can't match the key |
- exchange |
- """ |
- def blankKeyExchanges(proto2): |
- proto2.supportedKeyExchanges = [] |
- self.connectModifiedProtocol(blankKeyExchanges) |
- |
- |
- def test_disconnectIfCantMatchKeyAlg(self): |
- """ |
- Like test_disconnectIfCantMatchKex, but for the key algorithm. |
- """ |
- def blankPublicKeys(proto2): |
- proto2.supportedPublicKeys = [] |
- self.connectModifiedProtocol(blankPublicKeys) |
- |
- |
- def test_disconnectIfCantMatchCompression(self): |
- """ |
- Like test_disconnectIfCantMatchKex, but for the compression. |
- """ |
- def blankCompressions(proto2): |
- proto2.supportedCompressions = [] |
- self.connectModifiedProtocol(blankCompressions) |
- |
- |
- def test_disconnectIfCantMatchCipher(self): |
- """ |
- Like test_disconnectIfCantMatchKex, but for the encryption. |
- """ |
- def blankCiphers(proto2): |
- proto2.supportedCiphers = [] |
- self.connectModifiedProtocol(blankCiphers) |
- |
- |
- def test_disconnectIfCantMatchMAC(self): |
- """ |
- Like test_disconnectIfCantMatchKex, but for the MAC. |
- """ |
- def blankMACs(proto2): |
- proto2.supportedMACs = [] |
- self.connectModifiedProtocol(blankMACs) |
- |
- |
- |
-class ServerSSHTransportTestCase(ServerAndClientSSHTransportBaseCase, |
- TransportTestCase): |
- """ |
- Tests for the SSHServerTransport. |
- """ |
- |
- klass = transport.SSHServerTransport |
- |
- |
- def setUp(self): |
- TransportTestCase.setUp(self) |
- self.proto.factory = MockFactory() |
- self.proto.factory.startFactory() |
- |
- |
- def tearDown(self): |
- TransportTestCase.tearDown(self) |
- self.proto.factory.stopFactory() |
- del self.proto.factory |
- |
- |
- def test_KEXINIT(self): |
- """ |
- Test that receiving a KEXINIT packet sets up the correct values on the |
- server. |
- """ |
- self.proto.dataReceived( 'SSH-2.0-Twisted\r\n\x00\x00\x01\xd4\t\x14' |
- '\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99' |
- '\x99\x00\x00\x00=diffie-hellman-group1-sha1,diffie-hellman-g' |
- 'roup-exchange-sha1\x00\x00\x00\x0fssh-dss,ssh-rsa\x00\x00\x00' |
- '\x85aes128-ctr,aes128-cbc,aes192-ctr,aes192-cbc,aes256-ctr,ae' |
- 's256-cbc,cast128-ctr,cast128-cbc,blowfish-ctr,blowfish-cbc,3d' |
- 'es-ctr,3des-cbc\x00\x00\x00\x85aes128-ctr,aes128-cbc,aes192-c' |
- 'tr,aes192-cbc,aes256-ctr,aes256-cbc,cast128-ctr,cast128-cbc,b' |
- 'lowfish-ctr,blowfish-cbc,3des-ctr,3des-cbc\x00\x00\x00\x12hma' |
- 'c-md5,hmac-sha1\x00\x00\x00\x12hmac-md5,hmac-sha1\x00\x00\x00' |
- '\tnone,zlib\x00\x00\x00\tnone,zlib\x00\x00\x00\x00\x00\x00' |
- '\x00\x00\x00\x00\x00\x00\x00\x99\x99\x99\x99\x99\x99\x99\x99' |
- '\x99') |
- self.assertEquals(self.proto.kexAlg, |
- 'diffie-hellman-group1-sha1') |
- self.assertEquals(self.proto.keyAlg, |
- 'ssh-dss') |
- self.assertEquals(self.proto.outgoingCompressionType, |
- 'none') |
- self.assertEquals(self.proto.incomingCompressionType, |
- 'none') |
- ne = self.proto.nextEncryptions |
- self.assertEquals(ne.outCipType, 'aes128-ctr') |
- self.assertEquals(ne.inCipType, 'aes128-ctr') |
- self.assertEquals(ne.outMACType, 'hmac-md5') |
- self.assertEquals(ne.inMACType, 'hmac-md5') |
- |
- |
- def test_ignoreGuessPacketKex(self): |
- """ |
- The client is allowed to send a guessed key exchange packet |
- after it sends the KEXINIT packet. However, if the key exchanges |
- do not match, that guess packet must be ignored. This tests that |
- the packet is ignored in the case of the key exchange method not |
- matching. |
- """ |
- kexInitPacket = '\x00' * 16 + ( |
- ''.join([common.NS(x) for x in |
- [','.join(y) for y in |
- [self.proto.supportedKeyExchanges[::-1], |
- self.proto.supportedPublicKeys, |
- self.proto.supportedCiphers, |
- self.proto.supportedCiphers, |
- self.proto.supportedMACs, |
- self.proto.supportedMACs, |
- self.proto.supportedCompressions, |
- self.proto.supportedCompressions, |
- self.proto.supportedLanguages, |
- self.proto.supportedLanguages]]])) + ( |
- '\xff\x00\x00\x00\x00') |
- self.proto.ssh_KEXINIT(kexInitPacket) |
- self.assertTrue(self.proto.ignoreNextPacket) |
- self.proto.ssh_DEBUG("\x01\x00\x00\x00\x04test\x00\x00\x00\x00") |
- self.assertTrue(self.proto.ignoreNextPacket) |
- |
- |
- self.proto.ssh_KEX_DH_GEX_REQUEST_OLD('\x00\x00\x08\x00') |
- self.assertFalse(self.proto.ignoreNextPacket) |
- self.assertEquals(self.packets, []) |
- self.proto.ignoreNextPacket = True |
- |
- self.proto.ssh_KEX_DH_GEX_REQUEST('\x00\x00\x08\x00' * 3) |
- self.assertFalse(self.proto.ignoreNextPacket) |
- self.assertEquals(self.packets, []) |
- |
- |
- def test_ignoreGuessPacketKey(self): |
- """ |
- Like test_ignoreGuessPacketKex, but for an incorrectly guessed |
- public key format. |
- """ |
- kexInitPacket = '\x00' * 16 + ( |
- ''.join([common.NS(x) for x in |
- [','.join(y) for y in |
- [self.proto.supportedKeyExchanges, |
- self.proto.supportedPublicKeys[::-1], |
- self.proto.supportedCiphers, |
- self.proto.supportedCiphers, |
- self.proto.supportedMACs, |
- self.proto.supportedMACs, |
- self.proto.supportedCompressions, |
- self.proto.supportedCompressions, |
- self.proto.supportedLanguages, |
- self.proto.supportedLanguages]]])) + ( |
- '\xff\x00\x00\x00\x00') |
- self.proto.ssh_KEXINIT(kexInitPacket) |
- self.assertTrue(self.proto.ignoreNextPacket) |
- self.proto.ssh_DEBUG("\x01\x00\x00\x00\x04test\x00\x00\x00\x00") |
- self.assertTrue(self.proto.ignoreNextPacket) |
- |
- self.proto.ssh_KEX_DH_GEX_REQUEST_OLD('\x00\x00\x08\x00') |
- self.assertFalse(self.proto.ignoreNextPacket) |
- self.assertEquals(self.packets, []) |
- self.proto.ignoreNextPacket = True |
- |
- self.proto.ssh_KEX_DH_GEX_REQUEST('\x00\x00\x08\x00' * 3) |
- self.assertFalse(self.proto.ignoreNextPacket) |
- self.assertEquals(self.packets, []) |
- |
- |
- def test_KEXDH_INIT(self): |
- """ |
- Test that the KEXDH_INIT packet causes the server to send a |
- KEXDH_REPLY with the server's public key and a signature. |
- """ |
- self.proto.supportedKeyExchanges = ['diffie-hellman-group1-sha1'] |
- self.proto.supportedPublicKeys = ['ssh-rsa'] |
- self.proto.dataReceived(self.transport.value()) |
- e = pow(transport.DH_GENERATOR, 5000, |
- transport.DH_PRIME) |
- |
- self.proto.ssh_KEX_DH_GEX_REQUEST_OLD(common.MP(e)) |
- y = common.getMP('\x00\x00\x00\x40' + '\x99' * 64)[0] |
- f = common._MPpow(transport.DH_GENERATOR, y, transport.DH_PRIME) |
- sharedSecret = common._MPpow(e, y, transport.DH_PRIME) |
- |
- h = sha.new() |
- h.update(common.NS(self.proto.ourVersionString) * 2) |
- h.update(common.NS(self.proto.ourKexInitPayload) * 2) |
- h.update(common.NS(self.proto.factory.publicKeys['ssh-rsa'].blob())) |
- h.update(common.MP(e)) |
- h.update(f) |
- h.update(sharedSecret) |
- exchangeHash = h.digest() |
- |
- signature = self.proto.factory.privateKeys['ssh-rsa'].sign( |
- exchangeHash) |
- |
- self.assertEquals( |
- self.packets, |
- [(transport.MSG_KEXDH_REPLY, |
- common.NS(self.proto.factory.publicKeys['ssh-rsa'].blob()) |
- + f + common.NS(signature)), |
- (transport.MSG_NEWKEYS, '')]) |
- |
- |
- def test_KEX_DH_GEX_REQUEST_OLD(self): |
- """ |
- Test that the KEX_DH_GEX_REQUEST_OLD message causes the server |
- to reply with a KEX_DH_GEX_GROUP message with the correct |
- Diffie-Hellman group. |
- """ |
- self.proto.supportedKeyExchanges = [ |
- 'diffie-hellman-group-exchange-sha1'] |
- self.proto.supportedPublicKeys = ['ssh-rsa'] |
- self.proto.dataReceived(self.transport.value()) |
- self.proto.ssh_KEX_DH_GEX_REQUEST_OLD('\x00\x00\x04\x00') |
- self.assertEquals( |
- self.packets, |
- [(transport.MSG_KEX_DH_GEX_GROUP, |
- common.MP(transport.DH_PRIME) + '\x00\x00\x00\x01\x02')]) |
- self.assertEquals(self.proto.g, 2) |
- self.assertEquals(self.proto.p, transport.DH_PRIME) |
- |
- |
- def test_KEX_DH_GEX_REQUEST_OLD_badKexAlg(self): |
- """ |
- Test that if the server recieves a KEX_DH_GEX_REQUEST_OLD message |
- and the key exchange algorithm is not 'diffie-hellman-group1-sha1' or |
- 'diffie-hellman-group-exchange-sha1', we raise a ConchError. |
- """ |
- self.proto.kexAlg = None |
- self.assertRaises(ConchError, self.proto.ssh_KEX_DH_GEX_REQUEST_OLD, |
- None) |
- |
- |
- def test_KEX_DH_GEX_REQUEST(self): |
- """ |
- Test that the KEX_DH_GEX_REQUEST message causes the server to reply |
- with a KEX_DH_GEX_GROUP message with the correct Diffie-Hellman |
- group. |
- """ |
- self.proto.supportedKeyExchanges = [ |
- 'diffie-hellman-group-exchange-sha1'] |
- self.proto.supportedPublicKeys = ['ssh-rsa'] |
- self.proto.dataReceived(self.transport.value()) |
- self.proto.ssh_KEX_DH_GEX_REQUEST('\x00\x00\x04\x00\x00\x00\x08\x00' + |
- '\x00\x00\x0c\x00') |
- self.assertEquals( |
- self.packets, |
- [(transport.MSG_KEX_DH_GEX_GROUP, |
- common.MP(transport.DH_PRIME) + '\x00\x00\x00\x01\x03')]) |
- self.assertEquals(self.proto.g, 3) |
- self.assertEquals(self.proto.p, transport.DH_PRIME) |
- |
- |
- def test_KEX_DH_GEX_INIT_after_REQUEST(self): |
- """ |
- Test that the KEX_DH_GEX_INIT message after the client sends |
- KEX_DH_GEX_REQUEST causes the server to send a KEX_DH_GEX_INIT message |
- with a public key and signature. |
- """ |
- self.test_KEX_DH_GEX_REQUEST() |
- e = pow(self.proto.g, 3, self.proto.p) |
- y = common.getMP('\x00\x00\x00\x80' + '\x99' * 128)[0] |
- f = common._MPpow(self.proto.g, y, self.proto.p) |
- sharedSecret = common._MPpow(e, y, self.proto.p) |
- h = sha.new() |
- h.update(common.NS(self.proto.ourVersionString) * 2) |
- h.update(common.NS(self.proto.ourKexInitPayload) * 2) |
- h.update(common.NS(self.proto.factory.publicKeys['ssh-rsa'].blob())) |
- h.update('\x00\x00\x04\x00\x00\x00\x08\x00\x00\x00\x0c\x00') |
- h.update(common.MP(self.proto.p)) |
- h.update(common.MP(self.proto.g)) |
- h.update(common.MP(e)) |
- h.update(f) |
- h.update(sharedSecret) |
- exchangeHash = h.digest() |
- self.proto.ssh_KEX_DH_GEX_INIT(common.MP(e)) |
- self.assertEquals( |
- self.packets[1], |
- (transport.MSG_KEX_DH_GEX_REPLY, |
- common.NS(self.proto.factory.publicKeys['ssh-rsa'].blob()) + |
- f + common.NS(self.proto.factory.privateKeys['ssh-rsa'].sign( |
- exchangeHash)))) |
- |
- |
- def test_KEX_DH_GEX_INIT_after_REQUEST_OLD(self): |
- """ |
- Test that the KEX_DH_GEX_INIT message after the client sends |
- KEX_DH_GEX_REQUEST_OLD causes the server to sent a KEX_DH_GEX_INIT |
- message with a public key and signature. |
- """ |
- self.test_KEX_DH_GEX_REQUEST_OLD() |
- e = pow(self.proto.g, 3, self.proto.p) |
- y = common.getMP('\x00\x00\x00\x80' + '\x99' * 128)[0] |
- f = common._MPpow(self.proto.g, y, self.proto.p) |
- sharedSecret = common._MPpow(e, y, self.proto.p) |
- h = sha.new() |
- h.update(common.NS(self.proto.ourVersionString) * 2) |
- h.update(common.NS(self.proto.ourKexInitPayload) * 2) |
- h.update(common.NS(self.proto.factory.publicKeys['ssh-rsa'].blob())) |
- h.update('\x00\x00\x04\x00') |
- h.update(common.MP(self.proto.p)) |
- h.update(common.MP(self.proto.g)) |
- h.update(common.MP(e)) |
- h.update(f) |
- h.update(sharedSecret) |
- exchangeHash = h.digest() |
- self.proto.ssh_KEX_DH_GEX_INIT(common.MP(e)) |
- self.assertEquals( |
- self.packets[1:], |
- [(transport.MSG_KEX_DH_GEX_REPLY, |
- common.NS(self.proto.factory.publicKeys['ssh-rsa'].blob()) + |
- f + common.NS(self.proto.factory.privateKeys['ssh-rsa'].sign( |
- exchangeHash))), |
- (transport.MSG_NEWKEYS, '')]) |
- |
- |
- def test_keySetup(self): |
- """ |
- Test that _keySetup sets up the next encryption keys. |
- """ |
- self.proto.nextEncryptions = MockCipher() |
- self.proto._keySetup('AB', 'CD') |
- self.assertEquals(self.proto.sessionID, 'CD') |
- self.proto._keySetup('AB', 'EF') |
- self.assertEquals(self.proto.sessionID, 'CD') |
- self.assertEquals(self.packets[-1], (transport.MSG_NEWKEYS, '')) |
- newKeys = [self.proto._getKey(c, 'AB', 'EF') for c in 'ABCDEF'] |
- self.assertEquals( |
- self.proto.nextEncryptions.keys, |
- (newKeys[1], newKeys[3], newKeys[0], newKeys[2], newKeys[5], |
- newKeys[4])) |
- |
- |
- def test_NEWKEYS(self): |
- """ |
- Test that NEWKEYS transitions the keys in nextEncryptions to |
- currentEncryptions. |
- """ |
- self.test_KEXINIT() |
- |
- self.proto.nextEncryptions = transport.SSHCiphers('none', 'none', |
- 'none', 'none') |
- self.proto.ssh_NEWKEYS('') |
- self.assertIdentical(self.proto.currentEncryptions, |
- self.proto.nextEncryptions) |
- self.assertIdentical(self.proto.outgoingCompression, None) |
- self.assertIdentical(self.proto.incomingCompression, None) |
- self.proto.outgoingCompressionType = 'zlib' |
- self.proto.ssh_NEWKEYS('') |
- self.failIfIdentical(self.proto.outgoingCompression, None) |
- self.proto.incomingCompressionType = 'zlib' |
- self.proto.ssh_NEWKEYS('') |
- self.failIfIdentical(self.proto.incomingCompression, None) |
- |
- |
- def test_SERVICE_REQUEST(self): |
- """ |
- Test that the SERVICE_REQUEST message requests and starts a |
- service. |
- """ |
- self.proto.ssh_SERVICE_REQUEST(common.NS('ssh-userauth')) |
- self.assertEquals(self.packets, [(transport.MSG_SERVICE_ACCEPT, |
- common.NS('ssh-userauth'))]) |
- self.assertEquals(self.proto.service.name, 'MockService') |
- |
- |
- def test_disconnectNEWKEYSData(self): |
- """ |
- Test that NEWKEYS disconnects if it receives data. |
- """ |
- self.proto.ssh_NEWKEYS("bad packet") |
- self.checkDisconnected() |
- |
- |
- def test_disconnectSERVICE_REQUESTBadService(self): |
- """ |
- Test that SERVICE_REQUESTS disconnects if an unknown service is |
- requested. |
- """ |
- self.proto.ssh_SERVICE_REQUEST(common.NS('no service')) |
- self.checkDisconnected(transport.DISCONNECT_SERVICE_NOT_AVAILABLE) |
- |
- |
- |
-class ClientSSHTransportTestCase(ServerAndClientSSHTransportBaseCase, |
- TransportTestCase): |
- """ |
- Tests for SSHClientTransport. |
- """ |
- |
- klass = transport.SSHClientTransport |
- |
- |
- def test_KEXINIT(self): |
- """ |
- Test that receiving a KEXINIT packet sets up the correct values on the |
- client. The way algorithms are picks is that the first item in the |
- client's list that is also in the server's list is chosen. |
- """ |
- self.proto.dataReceived( 'SSH-2.0-Twisted\r\n\x00\x00\x01\xd4\t\x14' |
- '\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99' |
- '\x99\x00\x00\x00=diffie-hellman-group1-sha1,diffie-hellman-g' |
- 'roup-exchange-sha1\x00\x00\x00\x0fssh-dss,ssh-rsa\x00\x00\x00' |
- '\x85aes128-ctr,aes128-cbc,aes192-ctr,aes192-cbc,aes256-ctr,ae' |
- 's256-cbc,cast128-ctr,cast128-cbc,blowfish-ctr,blowfish-cbc,3d' |
- 'es-ctr,3des-cbc\x00\x00\x00\x85aes128-ctr,aes128-cbc,aes192-c' |
- 'tr,aes192-cbc,aes256-ctr,aes256-cbc,cast128-ctr,cast128-cbc,b' |
- 'lowfish-ctr,blowfish-cbc,3des-ctr,3des-cbc\x00\x00\x00\x12hma' |
- 'c-md5,hmac-sha1\x00\x00\x00\x12hmac-md5,hmac-sha1\x00\x00\x00' |
- '\tzlib,none\x00\x00\x00\tzlib,none\x00\x00\x00\x00\x00\x00' |
- '\x00\x00\x00\x00\x00\x00\x00\x99\x99\x99\x99\x99\x99\x99\x99' |
- '\x99') |
- self.assertEquals(self.proto.kexAlg, |
- 'diffie-hellman-group-exchange-sha1') |
- self.assertEquals(self.proto.keyAlg, |
- 'ssh-rsa') |
- self.assertEquals(self.proto.outgoingCompressionType, |
- 'none') |
- self.assertEquals(self.proto.incomingCompressionType, |
- 'none') |
- ne = self.proto.nextEncryptions |
- self.assertEquals(ne.outCipType, 'aes256-ctr') |
- self.assertEquals(ne.inCipType, 'aes256-ctr') |
- self.assertEquals(ne.outMACType, 'hmac-sha1') |
- self.assertEquals(ne.inMACType, 'hmac-sha1') |
- |
- |
- def verifyHostKey(self, pubKey, fingerprint): |
- """ |
- Mock version of SSHClientTransport.verifyHostKey. |
- """ |
- self.calledVerifyHostKey = True |
- self.assertEquals(pubKey, self.blob) |
- self.assertEquals(fingerprint.replace(':', ''), |
- md5.new(pubKey).hexdigest()) |
- return defer.succeed(True) |
- |
- |
- def setUp(self): |
- TransportTestCase.setUp(self) |
- self.blob = keys.Key.fromString(keydata.publicRSA_openssh).blob() |
- self.privObj = keys.Key.fromString(keydata.privateRSA_openssh) |
- self.calledVerifyHostKey = False |
- self.proto.verifyHostKey = self.verifyHostKey |
- |
- |
- def test_notImplementedClientMethods(self): |
- """ |
- verifyHostKey() should return a Deferred which fails with a |
- NotImplementedError exception. connectionSecure() should raise |
- NotImplementedError(). |
- """ |
- self.assertRaises(NotImplementedError, self.klass().connectionSecure) |
- def _checkRaises(f): |
- f.trap(NotImplementedError) |
- d = self.klass().verifyHostKey(None, None) |
- return d.addCallback(self.fail).addErrback(_checkRaises) |
- |
- |
- def test_KEXINIT_groupexchange(self): |
- """ |
- Test that a KEXINIT packet with a group-exchange key exchange results |
- in a KEX_DH_GEX_REQUEST_OLD message.. |
- """ |
- self.proto.supportedKeyExchanges = [ |
- 'diffie-hellman-group-exchange-sha1'] |
- self.proto.dataReceived(self.transport.value()) |
- self.assertEquals(self.packets, [(transport.MSG_KEX_DH_GEX_REQUEST_OLD, |
- '\x00\x00\x08\x00')]) |
- |
- |
- def test_KEXINIT_group1(self): |
- """ |
- Like test_KEXINIT_groupexchange, but for the group-1 key exchange. |
- """ |
- self.proto.supportedKeyExchanges = ['diffie-hellman-group1-sha1'] |
- self.proto.dataReceived(self.transport.value()) |
- self.assertEquals(common.MP(self.proto.x)[5:], '\x99' * 64) |
- self.assertEquals(self.packets, |
- [(transport.MSG_KEXDH_INIT, self.proto.e)]) |
- |
- |
- def test_KEXINIT_badKexAlg(self): |
- """ |
- Test that the client raises a ConchError if it receives a |
- KEXINIT message bug doesn't have a key exchange algorithm that we |
- understand. |
- """ |
- self.proto.supportedKeyExchanges = ['diffie-hellman-group2-sha1'] |
- data = self.transport.value().replace('group1', 'group2') |
- self.assertRaises(ConchError, self.proto.dataReceived, data) |
- |
- |
- def test_KEXDH_REPLY(self): |
- """ |
- Test that the KEXDH_REPLY message verifies the server. |
- """ |
- self.test_KEXINIT_group1() |
- |
- sharedSecret = common._MPpow(transport.DH_GENERATOR, |
- self.proto.x, transport.DH_PRIME) |
- h = sha.new() |
- h.update(common.NS(self.proto.ourVersionString) * 2) |
- h.update(common.NS(self.proto.ourKexInitPayload) * 2) |
- h.update(common.NS(self.blob)) |
- h.update(self.proto.e) |
- h.update('\x00\x00\x00\x01\x02') # f |
- h.update(sharedSecret) |
- exchangeHash = h.digest() |
- |
- def _cbTestKEXDH_REPLY(value): |
- self.assertIdentical(value, None) |
- self.assertEquals(self.calledVerifyHostKey, True) |
- self.assertEquals(self.proto.sessionID, exchangeHash) |
- |
- signature = self.privObj.sign(exchangeHash) |
- |
- d = self.proto.ssh_KEX_DH_GEX_GROUP( |
- (common.NS(self.blob) + '\x00\x00\x00\x01\x02' + |
- common.NS(signature))) |
- d.addCallback(_cbTestKEXDH_REPLY) |
- |
- return d |
- |
- |
- def test_KEX_DH_GEX_GROUP(self): |
- """ |
- Test that the KEX_DH_GEX_GROUP message results in a |
- KEX_DH_GEX_INIT message with the client's Diffie-Hellman public key. |
- """ |
- self.test_KEXINIT_groupexchange() |
- self.proto.ssh_KEX_DH_GEX_GROUP( |
- '\x00\x00\x00\x01\x0f\x00\x00\x00\x01\x02') |
- self.assertEquals(self.proto.p, 15) |
- self.assertEquals(self.proto.g, 2) |
- self.assertEquals(common.MP(self.proto.x)[5:], '\x99' * 40) |
- self.assertEquals(self.proto.e, |
- common.MP(pow(2, self.proto.x, 15))) |
- self.assertEquals(self.packets[1:], [(transport.MSG_KEX_DH_GEX_INIT, |
- self.proto.e)]) |
- |
- |
- def test_KEX_DH_GEX_REPLY(self): |
- """ |
- Test that the KEX_DH_GEX_REPLY message results in a verified |
- server. |
- """ |
- |
- self.test_KEX_DH_GEX_GROUP() |
- sharedSecret = common._MPpow(3, self.proto.x, self.proto.p) |
- h = sha.new() |
- h.update(common.NS(self.proto.ourVersionString) * 2) |
- h.update(common.NS(self.proto.ourKexInitPayload) * 2) |
- h.update(common.NS(self.blob)) |
- h.update('\x00\x00\x08\x00\x00\x00\x00\x01\x0f\x00\x00\x00\x01\x02') |
- h.update(self.proto.e) |
- h.update('\x00\x00\x00\x01\x03') # f |
- h.update(sharedSecret) |
- exchangeHash = h.digest() |
- |
- def _cbTestKEX_DH_GEX_REPLY(value): |
- self.assertIdentical(value, None) |
- self.assertEquals(self.calledVerifyHostKey, True) |
- self.assertEquals(self.proto.sessionID, exchangeHash) |
- |
- signature = self.privObj.sign(exchangeHash) |
- |
- d = self.proto.ssh_KEX_DH_GEX_REPLY( |
- common.NS(self.blob) + |
- '\x00\x00\x00\x01\x03' + |
- common.NS(signature)) |
- d.addCallback(_cbTestKEX_DH_GEX_REPLY) |
- return d |
- |
- |
- def test_keySetup(self): |
- """ |
- Test that _keySetup sets up the next encryption keys. |
- """ |
- self.proto.nextEncryptions = MockCipher() |
- self.proto._keySetup('AB', 'CD') |
- self.assertEquals(self.proto.sessionID, 'CD') |
- self.proto._keySetup('AB', 'EF') |
- self.assertEquals(self.proto.sessionID, 'CD') |
- self.assertEquals(self.packets[-1], (transport.MSG_NEWKEYS, '')) |
- newKeys = [self.proto._getKey(c, 'AB', 'EF') for c in 'ABCDEF'] |
- self.assertEquals(self.proto.nextEncryptions.keys, |
- (newKeys[0], newKeys[2], newKeys[1], newKeys[3], |
- newKeys[4], newKeys[5])) |
- |
- |
- def test_NEWKEYS(self): |
- """ |
- Test that NEWKEYS transitions the keys from nextEncryptions to |
- currentEncryptions. |
- """ |
- self.test_KEXINIT() |
- secure = [False] |
- def stubConnectionSecure(): |
- secure[0] = True |
- self.proto.connectionSecure = stubConnectionSecure |
- |
- self.proto.nextEncryptions = transport.SSHCiphers('none', 'none', |
- 'none', 'none') |
- self.proto.ssh_NEWKEYS('') |
- |
- self.failIfIdentical(self.proto.currentEncryptions, |
- self.proto.nextEncryptions) |
- |
- self.proto.nextEncryptions = MockCipher() |
- self.proto._keySetup('AB', 'EF') |
- self.assertIdentical(self.proto.outgoingCompression, None) |
- self.assertIdentical(self.proto.incomingCompression, None) |
- self.assertIdentical(self.proto.currentEncryptions, |
- self.proto.nextEncryptions) |
- self.assertTrue(secure[0]) |
- self.proto.outgoingCompressionType = 'zlib' |
- self.proto.ssh_NEWKEYS('') |
- self.failIfIdentical(self.proto.outgoingCompression, None) |
- self.proto.incomingCompressionType = 'zlib' |
- self.proto.ssh_NEWKEYS('') |
- self.failIfIdentical(self.proto.incomingCompression, None) |
- |
- |
- def test_SERVICE_ACCEPT(self): |
- """ |
- Test that the SERVICE_ACCEPT packet starts the requested service. |
- """ |
- self.proto.instance = MockService() |
- self.proto.ssh_SERVICE_ACCEPT('\x00\x00\x00\x0bMockService') |
- self.assertTrue(self.proto.instance.started) |
- |
- |
- def test_requestService(self): |
- """ |
- Test that requesting a service sends a SERVICE_REQUEST packet. |
- """ |
- self.proto.requestService(MockService()) |
- self.assertEquals(self.packets, [(transport.MSG_SERVICE_REQUEST, |
- '\x00\x00\x00\x0bMockService')]) |
- |
- |
- def test_disconnectKEXDH_REPLYBadSignature(self): |
- """ |
- Test that KEXDH_REPLY disconnects if the signature is bad. |
- """ |
- self.test_KEXDH_REPLY() |
- self.proto._continueKEXDH_REPLY(None, self.blob, 3, "bad signature") |
- self.checkDisconnected(transport.DISCONNECT_KEY_EXCHANGE_FAILED) |
- |
- |
- def test_disconnectGEX_REPLYBadSignature(self): |
- """ |
- Like test_disconnectKEXDH_REPLYBadSignature, but for DH_GEX_REPLY. |
- """ |
- self.test_KEX_DH_GEX_REPLY() |
- self.proto._continueGEX_REPLY(None, self.blob, 3, "bad signature") |
- self.checkDisconnected(transport.DISCONNECT_KEY_EXCHANGE_FAILED) |
- |
- |
- def test_disconnectNEWKEYSData(self): |
- """ |
- Test that NEWKEYS disconnects if it receives data. |
- """ |
- self.proto.ssh_NEWKEYS("bad packet") |
- self.checkDisconnected() |
- |
- |
- def test_disconnectSERVICE_ACCEPT(self): |
- """ |
- Test that SERVICE_ACCEPT disconnects if the accepted protocol is |
- differet from the asked-for protocol. |
- """ |
- self.proto.instance = MockService() |
- self.proto.ssh_SERVICE_ACCEPT('\x00\x00\x00\x03bad') |
- self.checkDisconnected() |
- |
- |
- |
-class SSHCiphersTestCase(unittest.TestCase): |
- """ |
- Tests for the SSHCiphers helper class. |
- """ |
- if Crypto is None: |
- skip = "cannot run w/o PyCrypto" |
- |
- |
- def test_init(self): |
- """ |
- Test that the initializer sets up the SSHCiphers object. |
- """ |
- ciphers = transport.SSHCiphers('A', 'B', 'C', 'D') |
- self.assertEquals(ciphers.outCipType, 'A') |
- self.assertEquals(ciphers.inCipType, 'B') |
- self.assertEquals(ciphers.outMACType, 'C') |
- self.assertEquals(ciphers.inMACType, 'D') |
- |
- |
- def test_getCipher(self): |
- """ |
- Test that the _getCipher method returns the correct cipher. |
- """ |
- ciphers = transport.SSHCiphers('A', 'B', 'C', 'D') |
- iv = key = '\x00' * 16 |
- for cipName, (modName, keySize, counter) in ciphers.cipherMap.items(): |
- cip = ciphers._getCipher(cipName, iv, key) |
- if cipName == 'none': |
- self.assertIsInstance(cip, transport._DummyCipher) |
- else: |
- self.assertTrue(str(cip).startswith('<' + modName)) |
- |
- |
- def test_getMAC(self): |
- """ |
- Test that the _getMAC method returns the correct MAC. |
- """ |
- ciphers = transport.SSHCiphers('A', 'B', 'C', 'D') |
- key = '\x00' * 64 |
- for macName, mac in ciphers.macMap.items(): |
- mod = ciphers._getMAC(macName, key) |
- if macName == 'none': |
- self.assertIdentical(mac, None) |
- else: |
- self.assertEquals(mod[0], mac) |
- self.assertEquals(mod[1], |
- Crypto.Cipher.XOR.new('\x36').encrypt(key)) |
- self.assertEquals(mod[2], |
- Crypto.Cipher.XOR.new('\x5c').encrypt(key)) |
- self.assertEquals(mod[3], len(mod[0].new().digest())) |
- |
- |
- def test_setKeysCiphers(self): |
- """ |
- Test that setKeys sets up the ciphers. |
- """ |
- key = '\x00' * 64 |
- cipherItems = transport.SSHCiphers.cipherMap.items() |
- for cipName, (modName, keySize, counter) in cipherItems: |
- encCipher = transport.SSHCiphers(cipName, 'none', 'none', 'none') |
- decCipher = transport.SSHCiphers('none', cipName, 'none', 'none') |
- cip = encCipher._getCipher(cipName, key, key) |
- bs = cip.block_size |
- encCipher.setKeys(key, key, '', '', '', '') |
- decCipher.setKeys('', '', key, key, '', '') |
- self.assertEquals(encCipher.encBlockSize, bs) |
- self.assertEquals(decCipher.decBlockSize, bs) |
- enc = cip.encrypt(key[:bs]) |
- enc2 = cip.encrypt(key[:bs]) |
- if counter: |
- self.failIfEquals(enc, enc2) |
- self.assertEquals(encCipher.encrypt(key[:bs]), enc) |
- self.assertEquals(encCipher.encrypt(key[:bs]), enc2) |
- self.assertEquals(decCipher.decrypt(enc), key[:bs]) |
- self.assertEquals(decCipher.decrypt(enc2), key[:bs]) |
- |
- |
- def test_setKeysMACs(self): |
- """ |
- Test that setKeys sets up the MACs. |
- """ |
- key = '\x00' * 64 |
- for macName, mod in transport.SSHCiphers.macMap.items(): |
- outMac = transport.SSHCiphers('none', 'none', macName, 'none') |
- inMac = transport.SSHCiphers('none', 'none', 'none', macName) |
- outMac.setKeys('', '', '', '', key, '') |
- inMac.setKeys('', '', '', '', '', key) |
- if mod: |
- ds = mod.digest_size |
- else: |
- ds = 0 |
- self.assertEquals(inMac.verifyDigestSize, ds) |
- if mod: |
- mod, i, o, ds = outMac._getMAC(macName, key) |
- seqid = 0 |
- data = key |
- packet = '\x00' * 4 + key |
- if mod: |
- mac = mod.new(o + mod.new(i + packet).digest()).digest() |
- else: |
- mac = '' |
- self.assertEquals(outMac.makeMAC(seqid, data), mac) |
- self.assertTrue(inMac.verify(seqid, data, mac)) |
- |
- |
- |
-class CounterTestCase(unittest.TestCase): |
- """ |
- Tests for the _Counter helper class. |
- """ |
- if Crypto is None: |
- skip = "cannot run w/o PyCrypto" |
- |
- |
- def test_init(self): |
- """ |
- Test that the counter is initialized correctly. |
- """ |
- counter = transport._Counter('\x00' * 8 + '\xff' * 8, 8) |
- self.assertEquals(counter.blockSize, 8) |
- self.assertEquals(counter.count.tostring(), '\x00' * 8) |
- |
- |
- def test_count(self): |
- """ |
- Test that the counter counts incrementally and wraps at the top. |
- """ |
- counter = transport._Counter('\x00', 1) |
- self.assertEquals(counter(), '\x01') |
- self.assertEquals(counter(), '\x02') |
- [counter() for i in range(252)] |
- self.assertEquals(counter(), '\xff') |
- self.assertEquals(counter(), '\x00') |
- |
- |
- |
-class TransportLoopbackTestCase(unittest.TestCase): |
- """ |
- Test the server transport and client transport against each other, |
- """ |
- if Crypto is None: |
- skip = "cannot run w/o PyCrypto" |
- |
- |
- def _runClientServer(self, mod): |
- """ |
- Run an async client and server, modifying each using the mod function |
- provided. Returns a Deferred called back when both Protocols have |
- disconnected. |
- |
- @type mod: C{func} |
- @rtype: C{defer.Deferred} |
- """ |
- factory = MockFactory() |
- server = transport.SSHServerTransport() |
- server.factory = factory |
- factory.startFactory() |
- server.errors = [] |
- server.receiveError = lambda code, desc: server.errors.append(( |
- code, desc)) |
- client = transport.SSHClientTransport() |
- client.verifyHostKey = lambda x, y: defer.succeed(None) |
- client.errors = [] |
- client.receiveError = lambda code, desc: client.errors.append(( |
- code, desc)) |
- client.connectionSecure = lambda: client.loseConnection() |
- server = mod(server) |
- client = mod(client) |
- def check(ignored, server, client): |
- name = repr([server.supportedCiphers[0], |
- server.supportedMACs[0], |
- server.supportedKeyExchanges[0], |
- server.supportedCompressions[0]]) |
- self.assertEquals(client.errors, []) |
- self.assertEquals(server.errors, [( |
- transport.DISCONNECT_CONNECTION_LOST, |
- "user closed connection")]) |
- if server.supportedCiphers[0] == 'none': |
- self.assertFalse(server.isEncrypted(), name) |
- self.assertFalse(client.isEncrypted(), name) |
- else: |
- self.assertTrue(server.isEncrypted(), name) |
- self.assertTrue(client.isEncrypted(), name) |
- if server.supportedMACs[0] == 'none': |
- self.assertFalse(server.isVerified(), name) |
- self.assertFalse(client.isVerified(), name) |
- else: |
- self.assertTrue(server.isVerified(), name) |
- self.assertTrue(client.isVerified(), name) |
- |
- d = loopback.loopbackAsync(server, client) |
- d.addCallback(check, server, client) |
- return d |
- |
- |
- def test_ciphers(self): |
- """ |
- Test that the client and server play nicely together, in all |
- the various combinations of ciphers. |
- """ |
- deferreds = [] |
- for cipher in transport.SSHTransportBase.supportedCiphers + ['none']: |
- def setCipher(proto): |
- proto.supportedCiphers = [cipher] |
- return proto |
- deferreds.append(self._runClientServer(setCipher)) |
- return defer.DeferredList(deferreds, fireOnOneErrback=True) |
- |
- |
- def test_macs(self): |
- """ |
- Like test_ciphers, but for the various MACs. |
- """ |
- deferreds = [] |
- for mac in transport.SSHTransportBase.supportedMACs + ['none']: |
- def setMAC(proto): |
- proto.supportedMACs = [mac] |
- return proto |
- deferreds.append(self._runClientServer(setMAC)) |
- return defer.DeferredList(deferreds, fireOnOneErrback=True) |
- |
- |
- def test_keyexchanges(self): |
- """ |
- Like test_ciphers, but for the various key exchanges. |
- """ |
- deferreds = [] |
- for kex in transport.SSHTransportBase.supportedKeyExchanges: |
- def setKeyExchange(proto): |
- proto.supportedKeyExchanges = [kex] |
- return proto |
- deferreds.append(self._runClientServer(setKeyExchange)) |
- return defer.DeferredList(deferreds, fireOnOneErrback=True) |
- |
- |
- def test_compressions(self): |
- """ |
- Like test_ciphers, but for the various compressions. |
- """ |
- deferreds = [] |
- for compression in transport.SSHTransportBase.supportedCompressions: |
- def setCompression(proto): |
- proto.supportedCompressions = [compression] |
- return proto |
- deferreds.append(self._runClientServer(setCompression)) |
- return defer.DeferredList(deferreds, fireOnOneErrback=True) |
- |
- |
- |
-class OldFactoryTestCase(unittest.TestCase): |
- """ |
- The old C{SSHFactory.getPublicKeys}() returned mappings of key names to |
- strings of key blobs and mappings of key names to PyCrypto key objects from |
- C{SSHFactory.getPrivateKeys}() (they could also be specified with the |
- C{publicKeys} and C{privateKeys} attributes). This is no longer supported |
- by the C{SSHServerTransport}, so we warn the user if they create an old |
- factory. |
- """ |
- |
- |
- def test_getPublicKeysWarning(self): |
- """ |
- If the return value of C{getPublicKeys}() isn't a mapping from key |
- names to C{Key} objects, then warn the user and convert the mapping. |
- """ |
- sshFactory = MockOldFactoryPublicKeys() |
- self.assertWarns(DeprecationWarning, |
- "Returning a mapping from strings to strings from" |
- " getPublicKeys()/publicKeys (in %s) is deprecated. Return " |
- "a mapping from strings to Key objects instead." % |
- (qual(MockOldFactoryPublicKeys),), |
- factory.__file__, sshFactory.startFactory) |
- self.assertEquals(sshFactory.publicKeys, MockFactory().getPublicKeys()) |
- |
- |
- def test_getPrivateKeysWarning(self): |
- """ |
- If the return value of C{getPrivateKeys}() isn't a mapping from key |
- names to C{Key} objects, then warn the user and convert the mapping. |
- """ |
- sshFactory = MockOldFactoryPrivateKeys() |
- self.assertWarns(DeprecationWarning, |
- "Returning a mapping from strings to PyCrypto key objects from" |
- " getPrivateKeys()/privateKeys (in %s) is deprecated. Return" |
- " a mapping from strings to Key objects instead." % |
- (qual(MockOldFactoryPrivateKeys),), |
- factory.__file__, sshFactory.startFactory) |
- self.assertEquals(sshFactory.privateKeys, |
- MockFactory().getPrivateKeys()) |
- |
- |
- def test_publicKeysWarning(self): |
- """ |
- If the value of the C{publicKeys} attribute isn't a mapping from key |
- names to C{Key} objects, then warn the user and convert the mapping. |
- """ |
- sshFactory = MockOldFactoryPublicKeys() |
- sshFactory.publicKeys = sshFactory.getPublicKeys() |
- self.assertWarns(DeprecationWarning, |
- "Returning a mapping from strings to strings from" |
- " getPublicKeys()/publicKeys (in %s) is deprecated. Return " |
- "a mapping from strings to Key objects instead." % |
- (qual(MockOldFactoryPublicKeys),), |
- factory.__file__, sshFactory.startFactory) |
- self.assertEquals(sshFactory.publicKeys, MockFactory().getPublicKeys()) |
- |
- |
- def test_privateKeysWarning(self): |
- """ |
- If the return value of C{privateKeys} attribute isn't a mapping from |
- key names to C{Key} objects, then warn the user and convert the |
- mapping. |
- """ |
- sshFactory = MockOldFactoryPrivateKeys() |
- sshFactory.privateKeys = sshFactory.getPrivateKeys() |
- self.assertWarns(DeprecationWarning, |
- "Returning a mapping from strings to PyCrypto key objects from" |
- " getPrivateKeys()/privateKeys (in %s) is deprecated. Return" |
- " a mapping from strings to Key objects instead." % |
- (qual(MockOldFactoryPrivateKeys),), |
- factory.__file__, sshFactory.startFactory) |
- self.assertEquals(sshFactory.privateKeys, |
- MockFactory().getPrivateKeys()) |