| Index: third_party/twisted_8_1/twisted/conch/test/test_transport.py
|
| diff --git a/third_party/twisted_8_1/twisted/conch/test/test_transport.py b/third_party/twisted_8_1/twisted/conch/test/test_transport.py
|
| deleted file mode 100644
|
| index 2749adfb15db00e3040da1e51f4bd270b8a53d2d..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/conch/test/test_transport.py
|
| +++ /dev/null
|
| @@ -1,1930 +0,0 @@
|
| -# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -"""
|
| -Tests for ssh/transport.py and the classes therein.
|
| -"""
|
| -
|
| -import md5, sha
|
| -
|
| -try:
|
| - import Crypto
|
| -except ImportError:
|
| - Crypto = None
|
| - class transport: # fictional modules to make classes work
|
| - class SSHTransportBase: pass
|
| - class SSHServerTransport: pass
|
| - class SSHClientTransport: pass
|
| - class factory:
|
| - class SSHFactory:
|
| - pass
|
| -else:
|
| - from twisted.conch.ssh import transport, common, keys, factory
|
| - from twisted.conch.test import keydata
|
| -
|
| -from twisted.trial import unittest
|
| -from twisted.internet import defer
|
| -from twisted.protocols import loopback
|
| -from twisted.python import randbytes
|
| -from twisted.python.reflect import qual
|
| -from twisted.conch.ssh import service
|
| -from twisted.test import proto_helpers
|
| -
|
| -from twisted.conch.error import ConchError
|
| -
|
| -
|
| -
|
| -class MockTransportBase(transport.SSHTransportBase):
|
| - """
|
| - A base class for the client and server protocols. Stores the messages
|
| - it receieves instead of ignoring them.
|
| -
|
| - @ivar errors: a list of tuples: (reasonCode, description)
|
| - @ivar unimplementeds: a list of integers: sequence number
|
| - @ivar debugs: a list of tuples: (alwaysDisplay, message, lang)
|
| - @ivar ignoreds: a list of strings: ignored data
|
| - """
|
| -
|
| -
|
| - def connectionMade(self):
|
| - """
|
| - Set up instance variables.
|
| - """
|
| - transport.SSHTransportBase.connectionMade(self)
|
| - self.errors = []
|
| - self.unimplementeds = []
|
| - self.debugs = []
|
| - self.ignoreds = []
|
| -
|
| -
|
| - def receiveError(self, reasonCode, description):
|
| - """
|
| - Store any errors received.
|
| -
|
| - @type reasonCode: C{int}
|
| - @type description: C{str}
|
| - """
|
| - self.errors.append((reasonCode, description))
|
| -
|
| -
|
| - def receiveUnimplemented(self, seqnum):
|
| - """
|
| - Store any unimplemented packet messages.
|
| -
|
| - @type seqnum: C{int}
|
| - """
|
| - self.unimplementeds.append(seqnum)
|
| -
|
| -
|
| - def receiveDebug(self, alwaysDisplay, message, lang):
|
| - """
|
| - Store any debug messages.
|
| -
|
| - @type alwaysDisplay: C{bool}
|
| - @type message: C{str}
|
| - @type lang: C{str}
|
| - """
|
| - self.debugs.append((alwaysDisplay, message, lang))
|
| -
|
| -
|
| - def ssh_IGNORE(self, packet):
|
| - """
|
| - Store any ignored data.
|
| -
|
| - @type packet: C{str}
|
| - """
|
| - self.ignoreds.append(packet)
|
| -
|
| -
|
| -class MockCipher(object):
|
| - """
|
| - A mocked-up version of twisted.conch.ssh.transport.SSHCiphers.
|
| - """
|
| - outCipType = 'test'
|
| - encBlockSize = 6
|
| - inCipType = 'test'
|
| - decBlockSize = 6
|
| - inMACType = 'test'
|
| - outMACType = 'test'
|
| - verifyDigestSize = 1
|
| - usedEncrypt = False
|
| - usedDecrypt = False
|
| - outMAC = (None, '', '', 1)
|
| - inMAC = (None, '', '', 1)
|
| - keys = ()
|
| -
|
| -
|
| - def encrypt(self, x):
|
| - """
|
| - Called to encrypt the packet. Simply record that encryption was used
|
| - and return the data unchanged.
|
| - """
|
| - self.usedEncrypt = True
|
| - if (len(x) % self.encBlockSize) != 0:
|
| - raise RuntimeError("length %i modulo blocksize %i is not 0: %i" %
|
| - (len(x), self.encBlockSize, len(x) % self.encBlockSize))
|
| - return x
|
| -
|
| -
|
| - def decrypt(self, x):
|
| - """
|
| - Called to decrypt the packet. Simply record that decryption was used
|
| - and return the data unchanged.
|
| - """
|
| - self.usedDecrypt = True
|
| - if (len(x) % self.encBlockSize) != 0:
|
| - raise RuntimeError("length %i modulo blocksize %i is not 0: %i" %
|
| - (len(x), self.decBlockSize, len(x) % self.decBlockSize))
|
| - return x
|
| -
|
| -
|
| - def makeMAC(self, outgoingPacketSequence, payload):
|
| - """
|
| - Make a Message Authentication Code by sending the character value of
|
| - the outgoing packet.
|
| - """
|
| - return chr(outgoingPacketSequence)
|
| -
|
| -
|
| - def verify(self, incomingPacketSequence, packet, macData):
|
| - """
|
| - Verify the Message Authentication Code by checking that the packet
|
| - sequence number is the same.
|
| - """
|
| - return chr(incomingPacketSequence) == macData
|
| -
|
| -
|
| - def setKeys(self, ivOut, keyOut, ivIn, keyIn, macIn, macOut):
|
| - """
|
| - Record the keys.
|
| - """
|
| - self.keys = (ivOut, keyOut, ivIn, keyIn, macIn, macOut)
|
| -
|
| -
|
| -
|
| -class MockCompression:
|
| - """
|
| - A mocked-up compression, based on the zlib interface. Instead of
|
| - compressing, it reverses the data and adds a 0x66 byte to the end.
|
| - """
|
| -
|
| -
|
| - def compress(self, payload):
|
| - return payload[::-1] # reversed
|
| -
|
| -
|
| - def decompress(self, payload):
|
| - return payload[:-1][::-1]
|
| -
|
| -
|
| - def flush(self, kind):
|
| - return '\x66'
|
| -
|
| -
|
| -
|
| -class MockService(service.SSHService):
|
| - """
|
| - A mocked-up service, based on twisted.conch.ssh.service.SSHService.
|
| -
|
| - @ivar started: True if this service has been started.
|
| - @ivar stopped: True if this service has been stopped.
|
| - """
|
| - name = "MockService"
|
| - started = False
|
| - stopped = False
|
| - protocolMessages = {0xff: "MSG_TEST", 71: "MSG_fiction"}
|
| -
|
| -
|
| - def logPrefix(self):
|
| - return "MockService"
|
| -
|
| -
|
| - def serviceStarted(self):
|
| - """
|
| - Record that the service was started.
|
| - """
|
| - self.started = True
|
| -
|
| -
|
| - def serviceStopped(self):
|
| - """
|
| - Record that the service was stopped.
|
| - """
|
| - self.stopped = True
|
| -
|
| -
|
| - def ssh_TEST(self, packet):
|
| - """
|
| - A message that this service responds to.
|
| - """
|
| - self.transport.sendPacket(0xff, packet)
|
| -
|
| -
|
| -class MockFactory(factory.SSHFactory):
|
| - """
|
| - A mocked-up factory based on twisted.conch.ssh.factory.SSHFactory.
|
| - """
|
| - services = {
|
| - 'ssh-userauth': MockService}
|
| -
|
| -
|
| - def getPublicKeys(self):
|
| - """
|
| - Return the public keys that authenticate this server.
|
| - """
|
| - return {
|
| - 'ssh-rsa': keys.Key.fromString(keydata.publicRSA_openssh),
|
| - 'ssh-dsa': keys.Key.fromString(keydata.publicDSA_openssh)}
|
| -
|
| -
|
| - def getPrivateKeys(self):
|
| - """
|
| - Return the private keys that authenticate this server.
|
| - """
|
| - return {
|
| - 'ssh-rsa': keys.Key.fromString(keydata.privateRSA_openssh),
|
| - 'ssh-dsa': keys.Key.fromString(keydata.privateDSA_openssh)}
|
| -
|
| -
|
| - def getPrimes(self):
|
| - """
|
| - Return the Diffie-Hellman primes that can be used for the
|
| - diffie-hellman-group-exchange-sha1 key exchange.
|
| - """
|
| - return {
|
| - 1024: ((2, transport.DH_PRIME),),
|
| - 2048: ((3, transport.DH_PRIME),),
|
| - 4096: ((5, 7),)}
|
| -
|
| -
|
| -
|
| -class MockOldFactoryPublicKeys(MockFactory):
|
| - """
|
| - The old SSHFactory returned mappings from key names to strings from
|
| - getPublicKeys(). We return those here for testing.
|
| - """
|
| -
|
| -
|
| - def getPublicKeys(self):
|
| - """
|
| - We used to map key types to public key blobs as strings.
|
| - """
|
| - keys = MockFactory.getPublicKeys(self)
|
| - for name, key in keys.items()[:]:
|
| - keys[name] = key.blob()
|
| - return keys
|
| -
|
| -
|
| -
|
| -class MockOldFactoryPrivateKeys(MockFactory):
|
| - """
|
| - The old SSHFactory returned mappings from key names to PyCrypto key
|
| - objects from getPrivateKeys(). We return those here for testing.
|
| - """
|
| -
|
| -
|
| - def getPrivateKeys(self):
|
| - """
|
| - We used to map key types to PyCrypto key objects.
|
| - """
|
| - keys = MockFactory.getPrivateKeys(self)
|
| - for name, key in keys.items()[:]:
|
| - keys[name] = key.keyObject
|
| - return keys
|
| -
|
| -
|
| -
|
| -class TransportTestCase(unittest.TestCase):
|
| - """
|
| - Base class for transport test cases.
|
| - """
|
| - klass = None
|
| -
|
| - if Crypto is None:
|
| - skip = "cannot run w/o PyCrypto"
|
| -
|
| -
|
| - def setUp(self):
|
| - self.transport = proto_helpers.StringTransport()
|
| - self.proto = self.klass()
|
| - self.packets = []
|
| - def secureRandom(len):
|
| - """
|
| - Return a consistent entropy value
|
| - """
|
| - return '\x99' * len
|
| - self.oldSecureRandom = randbytes.secureRandom
|
| - randbytes.secureRandom = secureRandom
|
| - def stubSendPacket(messageType, payload):
|
| - self.packets.append((messageType, payload))
|
| - self.proto.makeConnection(self.transport)
|
| - # we just let the kex packet go into the transport
|
| - self.proto.sendPacket = stubSendPacket
|
| -
|
| -
|
| - def tearDown(self):
|
| - randbytes.secureRandom = self.oldSecureRandom
|
| - self.oldSecureRandom = None
|
| -
|
| -
|
| -
|
| -class BaseSSHTransportTestCase(TransportTestCase):
|
| - """
|
| - Test TransportBase. It implements the non-server/client specific
|
| - parts of the SSH transport protocol.
|
| - """
|
| -
|
| - klass = MockTransportBase
|
| -
|
| -
|
| - def test_sendVersion(self):
|
| - """
|
| - Test that the first thing sent over the connection is the version
|
| - string.
|
| - """
|
| - # the other setup was done in the setup method
|
| - self.assertEquals(self.transport.value().split('\r\n', 1)[0],
|
| - "SSH-2.0-Twisted")
|
| -
|
| -
|
| - def test_sendPacketPlain(self):
|
| - """
|
| - Test that plain (unencrypted, uncompressed) packets are sent
|
| - correctly. The format is::
|
| - uint32 length (including type and padding length)
|
| - byte padding length
|
| - byte type
|
| - bytes[length-padding length-2] data
|
| - bytes[padding length] padding
|
| - """
|
| - proto = MockTransportBase()
|
| - proto.makeConnection(self.transport)
|
| - self.transport.clear()
|
| - message = ord('A')
|
| - payload = 'BCDEFG'
|
| - proto.sendPacket(message, payload)
|
| - value = self.transport.value()
|
| - self.assertEquals(value, '\x00\x00\x00\x0c\x04ABCDEFG\x99\x99\x99\x99')
|
| -
|
| -
|
| - def test_sendPacketEncrypted(self):
|
| - """
|
| - Test that packets sent while encryption is enabled are sent
|
| - correctly. The whole packet should be encrypted.
|
| - """
|
| - proto = MockTransportBase()
|
| - proto.makeConnection(self.transport)
|
| - proto.currentEncryptions = testCipher = MockCipher()
|
| - message = ord('A')
|
| - payload = 'BC'
|
| - self.transport.clear()
|
| - proto.sendPacket(message, payload)
|
| - self.assertTrue(testCipher.usedEncrypt)
|
| - value = self.transport.value()
|
| - self.assertEquals(value, '\x00\x00\x00\x08\x04ABC\x99\x99\x99\x99\x01')
|
| -
|
| -
|
| - def test_sendPacketCompressed(self):
|
| - """
|
| - Test that packets sent while compression is enabled are sent
|
| - correctly. The packet type and data should be encrypted.
|
| - """
|
| - proto = MockTransportBase()
|
| - proto.makeConnection(self.transport)
|
| - proto.outgoingCompression = MockCompression()
|
| - self.transport.clear()
|
| - proto.sendPacket(ord('A'), 'B')
|
| - value = self.transport.value()
|
| - self.assertEquals(
|
| - value,
|
| - '\x00\x00\x00\x0c\x08BA\x66\x99\x99\x99\x99\x99\x99\x99\x99')
|
| -
|
| -
|
| - def test_sendPacketBoth(self):
|
| - """
|
| - Test that packets sent while compression and encryption are
|
| - enabled are sent correctly. The packet type and data should be
|
| - compressed and then the whole packet should be encrypted.
|
| - """
|
| - proto = MockTransportBase()
|
| - proto.makeConnection(self.transport)
|
| - proto.currentEncryptions = testCipher = MockCipher()
|
| - proto.outgoingCompression = MockCompression()
|
| - message = ord('A')
|
| - payload = 'BC'
|
| - self.transport.clear()
|
| - proto.sendPacket(message, payload)
|
| - value = self.transport.value()
|
| - self.assertEquals(
|
| - value,
|
| - '\x00\x00\x00\x0e\x09CBA\x66\x99\x99\x99\x99\x99\x99\x99\x99\x99'
|
| - '\x01')
|
| -
|
| -
|
| - def test_getPacketPlain(self):
|
| - """
|
| - Test that packets are retrieved correctly out of the buffer when
|
| - no encryption is enabled.
|
| - """
|
| - proto = MockTransportBase()
|
| - proto.makeConnection(self.transport)
|
| - self.transport.clear()
|
| - proto.sendPacket(ord('A'), 'BC')
|
| - proto.buf = self.transport.value() + 'extra'
|
| - self.assertEquals(proto.getPacket(), 'ABC')
|
| - self.assertEquals(proto.buf, 'extra')
|
| -
|
| -
|
| - def test_getPacketEncrypted(self):
|
| - """
|
| - Test that encrypted packets are retrieved correctly.
|
| - See test_sendPacketEncrypted.
|
| - """
|
| - proto = MockTransportBase()
|
| - proto.sendKexInit = lambda: None # don't send packets
|
| - proto.makeConnection(self.transport)
|
| - self.transport.clear()
|
| - proto.currentEncryptions = testCipher = MockCipher()
|
| - proto.sendPacket(ord('A'), 'BCD')
|
| - value = self.transport.value()
|
| - proto.buf = value[:MockCipher.decBlockSize]
|
| - self.assertEquals(proto.getPacket(), None)
|
| - self.assertTrue(testCipher.usedDecrypt)
|
| - self.assertEquals(proto.first, '\x00\x00\x00\x0e\x09A')
|
| - proto.buf += value[MockCipher.decBlockSize:]
|
| - self.assertEquals(proto.getPacket(), 'ABCD')
|
| - self.assertEquals(proto.buf, '')
|
| -
|
| -
|
| - def test_getPacketCompressed(self):
|
| - """
|
| - Test that compressed packets are retrieved correctly. See
|
| - test_sendPacketCompressed.
|
| - """
|
| - proto = MockTransportBase()
|
| - proto.makeConnection(self.transport)
|
| - self.transport.clear()
|
| - proto.outgoingCompression = MockCompression()
|
| - proto.incomingCompression = proto.outgoingCompression
|
| - proto.sendPacket(ord('A'), 'BCD')
|
| - proto.buf = self.transport.value()
|
| - self.assertEquals(proto.getPacket(), 'ABCD')
|
| -
|
| -
|
| - def test_getPacketBoth(self):
|
| - """
|
| - Test that compressed and encrypted packets are retrieved correctly.
|
| - See test_sendPacketBoth.
|
| - """
|
| - proto = MockTransportBase()
|
| - proto.sendKexInit = lambda: None
|
| - proto.makeConnection(self.transport)
|
| - self.transport.clear()
|
| - proto.currentEncryptions = testCipher = MockCipher()
|
| - proto.outgoingCompression = MockCompression()
|
| - proto.incomingCompression = proto.outgoingCompression
|
| - proto.sendPacket(ord('A'), 'BCDEFG')
|
| - proto.buf = self.transport.value()
|
| - self.assertEquals(proto.getPacket(), 'ABCDEFG')
|
| -
|
| -
|
| - def test_ciphersAreValid(self):
|
| - """
|
| - Test that all the supportedCiphers are valid.
|
| - """
|
| - ciphers = transport.SSHCiphers('A', 'B', 'C', 'D')
|
| - iv = key = '\x00' * 16
|
| - for cipName in self.proto.supportedCiphers:
|
| - self.assertTrue(ciphers._getCipher(cipName, iv, key))
|
| -
|
| -
|
| - def test_sendKexInit(self):
|
| - """
|
| - Test that the KEXINIT (key exchange initiation) message is sent
|
| - correctly. Payload::
|
| - bytes[16] cookie
|
| - string key exchange algorithms
|
| - string public key algorithms
|
| - string outgoing ciphers
|
| - string incoming ciphers
|
| - string outgoing MACs
|
| - string incoming MACs
|
| - string outgoing compressions
|
| - string incoming compressions
|
| - bool first packet follows
|
| - uint32 0
|
| - """
|
| - value = self.transport.value().split('\r\n', 1)[1]
|
| - self.proto.buf = value
|
| - packet = self.proto.getPacket()
|
| - self.assertEquals(packet[0], chr(transport.MSG_KEXINIT))
|
| - self.assertEquals(packet[1:17], '\x99' * 16)
|
| - (kex, pubkeys, ciphers1, ciphers2, macs1, macs2, compressions1,
|
| - compressions2, languages1, languages2,
|
| - buf) = common.getNS(packet[17:], 10)
|
| -
|
| - self.assertEquals(kex, ','.join(self.proto.supportedKeyExchanges))
|
| - self.assertEquals(pubkeys, ','.join(self.proto.supportedPublicKeys))
|
| - self.assertEquals(ciphers1, ','.join(self.proto.supportedCiphers))
|
| - self.assertEquals(ciphers2, ','.join(self.proto.supportedCiphers))
|
| - self.assertEquals(macs1, ','.join(self.proto.supportedMACs))
|
| - self.assertEquals(macs2, ','.join(self.proto.supportedMACs))
|
| - self.assertEquals(compressions1,
|
| - ','.join(self.proto.supportedCompressions))
|
| - self.assertEquals(compressions2,
|
| - ','.join(self.proto.supportedCompressions))
|
| - self.assertEquals(languages1, ','.join(self.proto.supportedLanguages))
|
| - self.assertEquals(languages2, ','.join(self.proto.supportedLanguages))
|
| - self.assertEquals(buf, '\x00' * 5)
|
| -
|
| -
|
| - def test_sendDebug(self):
|
| - """
|
| - Test that debug messages are sent correctly. Payload::
|
| - bool always display
|
| - string debug message
|
| - string language
|
| - """
|
| - self.proto.sendDebug("test", True, 'en')
|
| - self.assertEquals(
|
| - self.packets,
|
| - [(transport.MSG_DEBUG,
|
| - "\x01\x00\x00\x00\x04test\x00\x00\x00\x02en")])
|
| -
|
| -
|
| - def test_receiveDebug(self):
|
| - """
|
| - Test that debug messages are received correctly. See test_sendDebug.
|
| - """
|
| - self.proto.dispatchMessage(
|
| - transport.MSG_DEBUG,
|
| - '\x01\x00\x00\x00\x04test\x00\x00\x00\x02en')
|
| - self.assertEquals(self.proto.debugs, [(True, 'test', 'en')])
|
| -
|
| -
|
| - def test_sendIgnore(self):
|
| - """
|
| - Test that ignored messages are sent correctly. Payload::
|
| - string ignored data
|
| - """
|
| - self.proto.sendIgnore("test")
|
| - self.assertEquals(
|
| - self.packets, [(transport.MSG_IGNORE,
|
| - '\x00\x00\x00\x04test')])
|
| -
|
| -
|
| - def test_receiveIgnore(self):
|
| - """
|
| - Test that ignored messages are received correctly. See
|
| - test_sendIgnore.
|
| - """
|
| - self.proto.dispatchMessage(transport.MSG_IGNORE, 'test')
|
| - self.assertEquals(self.proto.ignoreds, ['test'])
|
| -
|
| -
|
| - def test_sendUnimplemented(self):
|
| - """
|
| - Test that unimplemented messages are sent correctly. Payload::
|
| - uint32 sequence number
|
| - """
|
| - self.proto.sendUnimplemented()
|
| - self.assertEquals(
|
| - self.packets, [(transport.MSG_UNIMPLEMENTED,
|
| - '\x00\x00\x00\x00')])
|
| -
|
| -
|
| - def test_receiveUnimplemented(self):
|
| - """
|
| - Test that unimplemented messages are received correctly. See
|
| - test_sendUnimplemented.
|
| - """
|
| - self.proto.dispatchMessage(transport.MSG_UNIMPLEMENTED,
|
| - '\x00\x00\x00\xff')
|
| - self.assertEquals(self.proto.unimplementeds, [255])
|
| -
|
| -
|
| - def test_sendDisconnect(self):
|
| - """
|
| - Test that disconnection messages are sent correctly. Payload::
|
| - uint32 reason code
|
| - string reason description
|
| - string language
|
| - """
|
| - disconnected = [False]
|
| - def stubLoseConnection():
|
| - disconnected[0] = True
|
| - self.transport.loseConnection = stubLoseConnection
|
| - self.proto.sendDisconnect(0xff, "test")
|
| - self.assertEquals(
|
| - self.packets,
|
| - [(transport.MSG_DISCONNECT,
|
| - "\x00\x00\x00\xff\x00\x00\x00\x04test\x00\x00\x00\x00")])
|
| - self.assertTrue(disconnected[0])
|
| -
|
| -
|
| - def test_receiveDisconnect(self):
|
| - """
|
| - Test that disconnection messages are received correctly. See
|
| - test_sendDisconnect.
|
| - """
|
| - disconnected = [False]
|
| - def stubLoseConnection():
|
| - disconnected[0] = True
|
| - self.transport.loseConnection = stubLoseConnection
|
| - self.proto.dispatchMessage(transport.MSG_DISCONNECT,
|
| - '\x00\x00\x00\xff\x00\x00\x00\x04test')
|
| - self.assertEquals(self.proto.errors, [(255, 'test')])
|
| - self.assertTrue(disconnected[0])
|
| -
|
| -
|
| - def test_dataReceived(self):
|
| - """
|
| - Test that dataReceived parses packets and dispatches them to
|
| - ssh_* methods.
|
| - """
|
| - kexInit = [False]
|
| - def stubKEXINIT(packet):
|
| - kexInit[0] = True
|
| - self.proto.ssh_KEXINIT = stubKEXINIT
|
| - self.proto.dataReceived(self.transport.value())
|
| - self.assertTrue(self.proto.gotVersion)
|
| - self.assertEquals(self.proto.ourVersionString,
|
| - self.proto.otherVersionString)
|
| - self.assertTrue(kexInit[0])
|
| -
|
| -
|
| - def test_service(self):
|
| - """
|
| - Test that the transport can set the running service and dispatches
|
| - packets to the service's packetReceived method.
|
| - """
|
| - service = MockService()
|
| - self.proto.setService(service)
|
| - self.assertEquals(self.proto.service, service)
|
| - self.assertTrue(service.started)
|
| - self.proto.dispatchMessage(0xff, "test")
|
| - self.assertEquals(self.packets, [(0xff, "test")])
|
| -
|
| - service2 = MockService()
|
| - self.proto.setService(service2)
|
| - self.assertTrue(service2.started)
|
| - self.assertTrue(service.stopped)
|
| -
|
| - self.proto.connectionLost(None)
|
| - self.assertTrue(service2.stopped)
|
| -
|
| -
|
| - def test_avatar(self):
|
| - """
|
| - Test that the transport notifies the avatar of disconnections.
|
| - """
|
| - disconnected = [False]
|
| - def logout():
|
| - disconnected[0] = True
|
| - self.proto.logoutFunction = logout
|
| - self.proto.avatar = True
|
| -
|
| - self.proto.connectionLost(None)
|
| - self.assertTrue(disconnected[0])
|
| -
|
| -
|
| - def test_isEncrypted(self):
|
| - """
|
| - Test that the transport accurately reflects its encrypted status.
|
| - """
|
| - self.assertFalse(self.proto.isEncrypted('in'))
|
| - self.assertFalse(self.proto.isEncrypted('out'))
|
| - self.assertFalse(self.proto.isEncrypted('both'))
|
| - self.proto.currentEncryptions = MockCipher()
|
| - self.assertTrue(self.proto.isEncrypted('in'))
|
| - self.assertTrue(self.proto.isEncrypted('out'))
|
| - self.assertTrue(self.proto.isEncrypted('both'))
|
| - self.proto.currentEncryptions = transport.SSHCiphers('none', 'none',
|
| - 'none', 'none')
|
| - self.assertFalse(self.proto.isEncrypted('in'))
|
| - self.assertFalse(self.proto.isEncrypted('out'))
|
| - self.assertFalse(self.proto.isEncrypted('both'))
|
| -
|
| - self.assertRaises(TypeError, self.proto.isEncrypted, 'bad')
|
| -
|
| -
|
| - def test_isVerified(self):
|
| - """
|
| - Test that the transport accurately reflects its verified status.
|
| - """
|
| - self.assertFalse(self.proto.isVerified('in'))
|
| - self.assertFalse(self.proto.isVerified('out'))
|
| - self.assertFalse(self.proto.isVerified('both'))
|
| - self.proto.currentEncryptions = MockCipher()
|
| - self.assertTrue(self.proto.isVerified('in'))
|
| - self.assertTrue(self.proto.isVerified('out'))
|
| - self.assertTrue(self.proto.isVerified('both'))
|
| - self.proto.currentEncryptions = transport.SSHCiphers('none', 'none',
|
| - 'none', 'none')
|
| - self.assertFalse(self.proto.isVerified('in'))
|
| - self.assertFalse(self.proto.isVerified('out'))
|
| - self.assertFalse(self.proto.isVerified('both'))
|
| -
|
| - self.assertRaises(TypeError, self.proto.isVerified, 'bad')
|
| -
|
| -
|
| - def test_loseConnection(self):
|
| - """
|
| - Test that loseConnection sends a disconnect message and closes the
|
| - connection.
|
| - """
|
| - disconnected = [False]
|
| - def stubLoseConnection():
|
| - disconnected[0] = True
|
| - self.transport.loseConnection = stubLoseConnection
|
| - self.proto.loseConnection()
|
| - self.assertEquals(self.packets[0][0], transport.MSG_DISCONNECT)
|
| - self.assertEquals(self.packets[0][1][3],
|
| - chr(transport.DISCONNECT_CONNECTION_LOST))
|
| -
|
| -
|
| - def test_badVersion(self):
|
| - """
|
| - Test that the transport disconnects when it receives a bad version.
|
| - """
|
| - def testBad(version):
|
| - self.packets = []
|
| - self.proto.gotVersion = False
|
| - disconnected = [False]
|
| - def stubLoseConnection():
|
| - disconnected[0] = True
|
| - self.transport.loseConnection = stubLoseConnection
|
| - for c in version + '\r\n':
|
| - self.proto.dataReceived(c)
|
| - self.assertTrue(disconnected[0])
|
| - self.assertEquals(self.packets[0][0], transport.MSG_DISCONNECT)
|
| - self.assertEquals(
|
| - self.packets[0][1][3],
|
| - chr(transport.DISCONNECT_PROTOCOL_VERSION_NOT_SUPPORTED))
|
| - testBad('SSH-1.5-OpenSSH')
|
| - testBad('SSH-3.0-Twisted')
|
| - testBad('GET / HTTP/1.1')
|
| -
|
| -
|
| - def test_dataBeforeVersion(self):
|
| - """
|
| - Test that the transport ignores data sent before the version string.
|
| - """
|
| - proto = MockTransportBase()
|
| - proto.makeConnection(proto_helpers.StringTransport())
|
| - data = ("""here's some stuff beforehand
|
| -here's some other stuff
|
| -""" + proto.ourVersionString + "\r\n")
|
| - [proto.dataReceived(c) for c in data]
|
| - self.assertTrue(proto.gotVersion)
|
| - self.assertEquals(proto.otherVersionString, proto.ourVersionString)
|
| -
|
| -
|
| - def test_compatabilityVersion(self):
|
| - """
|
| - Test that the transport treats the compatbility version (1.99)
|
| - as equivalent to version 2.0.
|
| - """
|
| - proto = MockTransportBase()
|
| - proto.makeConnection(proto_helpers.StringTransport())
|
| - proto.dataReceived("SSH-1.99-OpenSSH\n")
|
| - self.assertTrue(proto.gotVersion)
|
| - self.assertEquals(proto.otherVersionString, "SSH-1.99-OpenSSH")
|
| -
|
| -
|
| - def test_badPackets(self):
|
| - """
|
| - Test that the transport disconnects with an error when it receives
|
| - bad packets.
|
| - """
|
| - def testBad(packet, error=transport.DISCONNECT_PROTOCOL_ERROR):
|
| - self.packets = []
|
| - self.proto.buf = packet
|
| - self.assertEquals(self.proto.getPacket(), None)
|
| - self.assertEquals(len(self.packets), 1)
|
| - self.assertEquals(self.packets[0][0], transport.MSG_DISCONNECT)
|
| - self.assertEquals(self.packets[0][1][3], chr(error))
|
| -
|
| - testBad('\xff' * 8) # big packet
|
| - testBad('\x00\x00\x00\x05\x00BCDE') # length not modulo blocksize
|
| - oldEncryptions = self.proto.currentEncryptions
|
| - self.proto.currentEncryptions = MockCipher()
|
| - testBad('\x00\x00\x00\x08\x06AB123456', # bad MAC
|
| - transport.DISCONNECT_MAC_ERROR)
|
| - self.proto.currentEncryptions.decrypt = lambda x: x[:-1]
|
| - testBad('\x00\x00\x00\x08\x06BCDEFGHIJK') # bad decryption
|
| - self.proto.currentEncryptions = oldEncryptions
|
| - self.proto.incomingCompression = MockCompression()
|
| - def stubDecompress(payload):
|
| - raise Exception('bad compression')
|
| - self.proto.incomingCompression.decompress = stubDecompress
|
| - testBad('\x00\x00\x00\x04\x00BCDE', # bad decompression
|
| - transport.DISCONNECT_COMPRESSION_ERROR)
|
| - self.flushLoggedErrors()
|
| -
|
| -
|
| - def test_unimplementedPackets(self):
|
| - """
|
| - Test that unimplemented packet types cause MSG_UNIMPLEMENTED packets
|
| - to be sent.
|
| - """
|
| - seqnum = self.proto.incomingPacketSequence
|
| - def checkUnimplemented(seqnum=seqnum):
|
| - self.assertEquals(self.packets[0][0],
|
| - transport.MSG_UNIMPLEMENTED)
|
| - self.assertEquals(self.packets[0][1][3], chr(seqnum))
|
| - self.proto.packets = []
|
| - seqnum += 1
|
| -
|
| - self.proto.dispatchMessage(40, '')
|
| - checkUnimplemented()
|
| - transport.messages[41] = 'MSG_fiction'
|
| - self.proto.dispatchMessage(41, '')
|
| - checkUnimplemented()
|
| - self.proto.dispatchMessage(60, '')
|
| - checkUnimplemented()
|
| - self.proto.setService(MockService())
|
| - self.proto.dispatchMessage(70, '')
|
| - checkUnimplemented()
|
| - self.proto.dispatchMessage(71, '')
|
| - checkUnimplemented()
|
| -
|
| -
|
| - def test_getKey(self):
|
| - """
|
| - Test that _getKey generates the correct keys.
|
| - """
|
| - self.proto.sessionID = 'EF'
|
| -
|
| - k1 = sha.new('AB' + 'CD'
|
| - + 'K' + self.proto.sessionID).digest()
|
| - k2 = sha.new('ABCD' + k1).digest()
|
| - self.assertEquals(self.proto._getKey('K', 'AB', 'CD'), k1 + k2)
|
| -
|
| -
|
| - def test_multipleClasses(self):
|
| - """
|
| - Test that multiple instances have distinct states.
|
| - """
|
| - proto = self.proto
|
| - proto.dataReceived(self.transport.value())
|
| - proto.currentEncryptions = MockCipher()
|
| - proto.outgoingCompression = MockCompression()
|
| - proto.incomingCompression = MockCompression()
|
| - proto.setService(MockService())
|
| - proto2 = MockTransportBase()
|
| - proto2.makeConnection(proto_helpers.StringTransport())
|
| - proto2.sendIgnore('')
|
| - self.failIfEquals(proto.gotVersion, proto2.gotVersion)
|
| - self.failIfEquals(proto.transport, proto2.transport)
|
| - self.failIfEquals(proto.outgoingPacketSequence,
|
| - proto2.outgoingPacketSequence)
|
| - self.failIfEquals(proto.incomingPacketSequence,
|
| - proto2.incomingPacketSequence)
|
| - self.failIfEquals(proto.currentEncryptions,
|
| - proto2.currentEncryptions)
|
| - self.failIfEquals(proto.service, proto2.service)
|
| -
|
| -
|
| -
|
| -class ServerAndClientSSHTransportBaseCase:
|
| - """
|
| - Tests that need to be run on both the server and the client.
|
| - """
|
| -
|
| -
|
| - def checkDisconnected(self, kind=None):
|
| - """
|
| - Helper function to check if the transport disconnected.
|
| - """
|
| - if kind is None:
|
| - kind = transport.DISCONNECT_PROTOCOL_ERROR
|
| - self.assertEquals(self.packets[-1][0], transport.MSG_DISCONNECT)
|
| - self.assertEquals(self.packets[-1][1][3], chr(kind))
|
| -
|
| -
|
| - def connectModifiedProtocol(self, protoModification,
|
| - kind=None):
|
| - """
|
| - Helper function to connect a modified protocol to the test protocol
|
| - and test for disconnection.
|
| - """
|
| - if kind is None:
|
| - kind = transport.DISCONNECT_KEY_EXCHANGE_FAILED
|
| - proto2 = self.klass()
|
| - protoModification(proto2)
|
| - proto2.makeConnection(proto_helpers.StringTransport())
|
| - self.proto.dataReceived(proto2.transport.value())
|
| - if kind:
|
| - self.checkDisconnected(kind)
|
| - return proto2
|
| -
|
| -
|
| - def test_disconnectIfCantMatchKex(self):
|
| - """
|
| - Test that the transport disconnects if it can't match the key
|
| - exchange
|
| - """
|
| - def blankKeyExchanges(proto2):
|
| - proto2.supportedKeyExchanges = []
|
| - self.connectModifiedProtocol(blankKeyExchanges)
|
| -
|
| -
|
| - def test_disconnectIfCantMatchKeyAlg(self):
|
| - """
|
| - Like test_disconnectIfCantMatchKex, but for the key algorithm.
|
| - """
|
| - def blankPublicKeys(proto2):
|
| - proto2.supportedPublicKeys = []
|
| - self.connectModifiedProtocol(blankPublicKeys)
|
| -
|
| -
|
| - def test_disconnectIfCantMatchCompression(self):
|
| - """
|
| - Like test_disconnectIfCantMatchKex, but for the compression.
|
| - """
|
| - def blankCompressions(proto2):
|
| - proto2.supportedCompressions = []
|
| - self.connectModifiedProtocol(blankCompressions)
|
| -
|
| -
|
| - def test_disconnectIfCantMatchCipher(self):
|
| - """
|
| - Like test_disconnectIfCantMatchKex, but for the encryption.
|
| - """
|
| - def blankCiphers(proto2):
|
| - proto2.supportedCiphers = []
|
| - self.connectModifiedProtocol(blankCiphers)
|
| -
|
| -
|
| - def test_disconnectIfCantMatchMAC(self):
|
| - """
|
| - Like test_disconnectIfCantMatchKex, but for the MAC.
|
| - """
|
| - def blankMACs(proto2):
|
| - proto2.supportedMACs = []
|
| - self.connectModifiedProtocol(blankMACs)
|
| -
|
| -
|
| -
|
| -class ServerSSHTransportTestCase(ServerAndClientSSHTransportBaseCase,
|
| - TransportTestCase):
|
| - """
|
| - Tests for the SSHServerTransport.
|
| - """
|
| -
|
| - klass = transport.SSHServerTransport
|
| -
|
| -
|
| - def setUp(self):
|
| - TransportTestCase.setUp(self)
|
| - self.proto.factory = MockFactory()
|
| - self.proto.factory.startFactory()
|
| -
|
| -
|
| - def tearDown(self):
|
| - TransportTestCase.tearDown(self)
|
| - self.proto.factory.stopFactory()
|
| - del self.proto.factory
|
| -
|
| -
|
| - def test_KEXINIT(self):
|
| - """
|
| - Test that receiving a KEXINIT packet sets up the correct values on the
|
| - server.
|
| - """
|
| - self.proto.dataReceived( 'SSH-2.0-Twisted\r\n\x00\x00\x01\xd4\t\x14'
|
| - '\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99'
|
| - '\x99\x00\x00\x00=diffie-hellman-group1-sha1,diffie-hellman-g'
|
| - 'roup-exchange-sha1\x00\x00\x00\x0fssh-dss,ssh-rsa\x00\x00\x00'
|
| - '\x85aes128-ctr,aes128-cbc,aes192-ctr,aes192-cbc,aes256-ctr,ae'
|
| - 's256-cbc,cast128-ctr,cast128-cbc,blowfish-ctr,blowfish-cbc,3d'
|
| - 'es-ctr,3des-cbc\x00\x00\x00\x85aes128-ctr,aes128-cbc,aes192-c'
|
| - 'tr,aes192-cbc,aes256-ctr,aes256-cbc,cast128-ctr,cast128-cbc,b'
|
| - 'lowfish-ctr,blowfish-cbc,3des-ctr,3des-cbc\x00\x00\x00\x12hma'
|
| - 'c-md5,hmac-sha1\x00\x00\x00\x12hmac-md5,hmac-sha1\x00\x00\x00'
|
| - '\tnone,zlib\x00\x00\x00\tnone,zlib\x00\x00\x00\x00\x00\x00'
|
| - '\x00\x00\x00\x00\x00\x00\x00\x99\x99\x99\x99\x99\x99\x99\x99'
|
| - '\x99')
|
| - self.assertEquals(self.proto.kexAlg,
|
| - 'diffie-hellman-group1-sha1')
|
| - self.assertEquals(self.proto.keyAlg,
|
| - 'ssh-dss')
|
| - self.assertEquals(self.proto.outgoingCompressionType,
|
| - 'none')
|
| - self.assertEquals(self.proto.incomingCompressionType,
|
| - 'none')
|
| - ne = self.proto.nextEncryptions
|
| - self.assertEquals(ne.outCipType, 'aes128-ctr')
|
| - self.assertEquals(ne.inCipType, 'aes128-ctr')
|
| - self.assertEquals(ne.outMACType, 'hmac-md5')
|
| - self.assertEquals(ne.inMACType, 'hmac-md5')
|
| -
|
| -
|
| - def test_ignoreGuessPacketKex(self):
|
| - """
|
| - The client is allowed to send a guessed key exchange packet
|
| - after it sends the KEXINIT packet. However, if the key exchanges
|
| - do not match, that guess packet must be ignored. This tests that
|
| - the packet is ignored in the case of the key exchange method not
|
| - matching.
|
| - """
|
| - kexInitPacket = '\x00' * 16 + (
|
| - ''.join([common.NS(x) for x in
|
| - [','.join(y) for y in
|
| - [self.proto.supportedKeyExchanges[::-1],
|
| - self.proto.supportedPublicKeys,
|
| - self.proto.supportedCiphers,
|
| - self.proto.supportedCiphers,
|
| - self.proto.supportedMACs,
|
| - self.proto.supportedMACs,
|
| - self.proto.supportedCompressions,
|
| - self.proto.supportedCompressions,
|
| - self.proto.supportedLanguages,
|
| - self.proto.supportedLanguages]]])) + (
|
| - '\xff\x00\x00\x00\x00')
|
| - self.proto.ssh_KEXINIT(kexInitPacket)
|
| - self.assertTrue(self.proto.ignoreNextPacket)
|
| - self.proto.ssh_DEBUG("\x01\x00\x00\x00\x04test\x00\x00\x00\x00")
|
| - self.assertTrue(self.proto.ignoreNextPacket)
|
| -
|
| -
|
| - self.proto.ssh_KEX_DH_GEX_REQUEST_OLD('\x00\x00\x08\x00')
|
| - self.assertFalse(self.proto.ignoreNextPacket)
|
| - self.assertEquals(self.packets, [])
|
| - self.proto.ignoreNextPacket = True
|
| -
|
| - self.proto.ssh_KEX_DH_GEX_REQUEST('\x00\x00\x08\x00' * 3)
|
| - self.assertFalse(self.proto.ignoreNextPacket)
|
| - self.assertEquals(self.packets, [])
|
| -
|
| -
|
| - def test_ignoreGuessPacketKey(self):
|
| - """
|
| - Like test_ignoreGuessPacketKex, but for an incorrectly guessed
|
| - public key format.
|
| - """
|
| - kexInitPacket = '\x00' * 16 + (
|
| - ''.join([common.NS(x) for x in
|
| - [','.join(y) for y in
|
| - [self.proto.supportedKeyExchanges,
|
| - self.proto.supportedPublicKeys[::-1],
|
| - self.proto.supportedCiphers,
|
| - self.proto.supportedCiphers,
|
| - self.proto.supportedMACs,
|
| - self.proto.supportedMACs,
|
| - self.proto.supportedCompressions,
|
| - self.proto.supportedCompressions,
|
| - self.proto.supportedLanguages,
|
| - self.proto.supportedLanguages]]])) + (
|
| - '\xff\x00\x00\x00\x00')
|
| - self.proto.ssh_KEXINIT(kexInitPacket)
|
| - self.assertTrue(self.proto.ignoreNextPacket)
|
| - self.proto.ssh_DEBUG("\x01\x00\x00\x00\x04test\x00\x00\x00\x00")
|
| - self.assertTrue(self.proto.ignoreNextPacket)
|
| -
|
| - self.proto.ssh_KEX_DH_GEX_REQUEST_OLD('\x00\x00\x08\x00')
|
| - self.assertFalse(self.proto.ignoreNextPacket)
|
| - self.assertEquals(self.packets, [])
|
| - self.proto.ignoreNextPacket = True
|
| -
|
| - self.proto.ssh_KEX_DH_GEX_REQUEST('\x00\x00\x08\x00' * 3)
|
| - self.assertFalse(self.proto.ignoreNextPacket)
|
| - self.assertEquals(self.packets, [])
|
| -
|
| -
|
| - def test_KEXDH_INIT(self):
|
| - """
|
| - Test that the KEXDH_INIT packet causes the server to send a
|
| - KEXDH_REPLY with the server's public key and a signature.
|
| - """
|
| - self.proto.supportedKeyExchanges = ['diffie-hellman-group1-sha1']
|
| - self.proto.supportedPublicKeys = ['ssh-rsa']
|
| - self.proto.dataReceived(self.transport.value())
|
| - e = pow(transport.DH_GENERATOR, 5000,
|
| - transport.DH_PRIME)
|
| -
|
| - self.proto.ssh_KEX_DH_GEX_REQUEST_OLD(common.MP(e))
|
| - y = common.getMP('\x00\x00\x00\x40' + '\x99' * 64)[0]
|
| - f = common._MPpow(transport.DH_GENERATOR, y, transport.DH_PRIME)
|
| - sharedSecret = common._MPpow(e, y, transport.DH_PRIME)
|
| -
|
| - h = sha.new()
|
| - h.update(common.NS(self.proto.ourVersionString) * 2)
|
| - h.update(common.NS(self.proto.ourKexInitPayload) * 2)
|
| - h.update(common.NS(self.proto.factory.publicKeys['ssh-rsa'].blob()))
|
| - h.update(common.MP(e))
|
| - h.update(f)
|
| - h.update(sharedSecret)
|
| - exchangeHash = h.digest()
|
| -
|
| - signature = self.proto.factory.privateKeys['ssh-rsa'].sign(
|
| - exchangeHash)
|
| -
|
| - self.assertEquals(
|
| - self.packets,
|
| - [(transport.MSG_KEXDH_REPLY,
|
| - common.NS(self.proto.factory.publicKeys['ssh-rsa'].blob())
|
| - + f + common.NS(signature)),
|
| - (transport.MSG_NEWKEYS, '')])
|
| -
|
| -
|
| - def test_KEX_DH_GEX_REQUEST_OLD(self):
|
| - """
|
| - Test that the KEX_DH_GEX_REQUEST_OLD message causes the server
|
| - to reply with a KEX_DH_GEX_GROUP message with the correct
|
| - Diffie-Hellman group.
|
| - """
|
| - self.proto.supportedKeyExchanges = [
|
| - 'diffie-hellman-group-exchange-sha1']
|
| - self.proto.supportedPublicKeys = ['ssh-rsa']
|
| - self.proto.dataReceived(self.transport.value())
|
| - self.proto.ssh_KEX_DH_GEX_REQUEST_OLD('\x00\x00\x04\x00')
|
| - self.assertEquals(
|
| - self.packets,
|
| - [(transport.MSG_KEX_DH_GEX_GROUP,
|
| - common.MP(transport.DH_PRIME) + '\x00\x00\x00\x01\x02')])
|
| - self.assertEquals(self.proto.g, 2)
|
| - self.assertEquals(self.proto.p, transport.DH_PRIME)
|
| -
|
| -
|
| - def test_KEX_DH_GEX_REQUEST_OLD_badKexAlg(self):
|
| - """
|
| - Test that if the server recieves a KEX_DH_GEX_REQUEST_OLD message
|
| - and the key exchange algorithm is not 'diffie-hellman-group1-sha1' or
|
| - 'diffie-hellman-group-exchange-sha1', we raise a ConchError.
|
| - """
|
| - self.proto.kexAlg = None
|
| - self.assertRaises(ConchError, self.proto.ssh_KEX_DH_GEX_REQUEST_OLD,
|
| - None)
|
| -
|
| -
|
| - def test_KEX_DH_GEX_REQUEST(self):
|
| - """
|
| - Test that the KEX_DH_GEX_REQUEST message causes the server to reply
|
| - with a KEX_DH_GEX_GROUP message with the correct Diffie-Hellman
|
| - group.
|
| - """
|
| - self.proto.supportedKeyExchanges = [
|
| - 'diffie-hellman-group-exchange-sha1']
|
| - self.proto.supportedPublicKeys = ['ssh-rsa']
|
| - self.proto.dataReceived(self.transport.value())
|
| - self.proto.ssh_KEX_DH_GEX_REQUEST('\x00\x00\x04\x00\x00\x00\x08\x00' +
|
| - '\x00\x00\x0c\x00')
|
| - self.assertEquals(
|
| - self.packets,
|
| - [(transport.MSG_KEX_DH_GEX_GROUP,
|
| - common.MP(transport.DH_PRIME) + '\x00\x00\x00\x01\x03')])
|
| - self.assertEquals(self.proto.g, 3)
|
| - self.assertEquals(self.proto.p, transport.DH_PRIME)
|
| -
|
| -
|
| - def test_KEX_DH_GEX_INIT_after_REQUEST(self):
|
| - """
|
| - Test that the KEX_DH_GEX_INIT message after the client sends
|
| - KEX_DH_GEX_REQUEST causes the server to send a KEX_DH_GEX_INIT message
|
| - with a public key and signature.
|
| - """
|
| - self.test_KEX_DH_GEX_REQUEST()
|
| - e = pow(self.proto.g, 3, self.proto.p)
|
| - y = common.getMP('\x00\x00\x00\x80' + '\x99' * 128)[0]
|
| - f = common._MPpow(self.proto.g, y, self.proto.p)
|
| - sharedSecret = common._MPpow(e, y, self.proto.p)
|
| - h = sha.new()
|
| - h.update(common.NS(self.proto.ourVersionString) * 2)
|
| - h.update(common.NS(self.proto.ourKexInitPayload) * 2)
|
| - h.update(common.NS(self.proto.factory.publicKeys['ssh-rsa'].blob()))
|
| - h.update('\x00\x00\x04\x00\x00\x00\x08\x00\x00\x00\x0c\x00')
|
| - h.update(common.MP(self.proto.p))
|
| - h.update(common.MP(self.proto.g))
|
| - h.update(common.MP(e))
|
| - h.update(f)
|
| - h.update(sharedSecret)
|
| - exchangeHash = h.digest()
|
| - self.proto.ssh_KEX_DH_GEX_INIT(common.MP(e))
|
| - self.assertEquals(
|
| - self.packets[1],
|
| - (transport.MSG_KEX_DH_GEX_REPLY,
|
| - common.NS(self.proto.factory.publicKeys['ssh-rsa'].blob()) +
|
| - f + common.NS(self.proto.factory.privateKeys['ssh-rsa'].sign(
|
| - exchangeHash))))
|
| -
|
| -
|
| - def test_KEX_DH_GEX_INIT_after_REQUEST_OLD(self):
|
| - """
|
| - Test that the KEX_DH_GEX_INIT message after the client sends
|
| - KEX_DH_GEX_REQUEST_OLD causes the server to sent a KEX_DH_GEX_INIT
|
| - message with a public key and signature.
|
| - """
|
| - self.test_KEX_DH_GEX_REQUEST_OLD()
|
| - e = pow(self.proto.g, 3, self.proto.p)
|
| - y = common.getMP('\x00\x00\x00\x80' + '\x99' * 128)[0]
|
| - f = common._MPpow(self.proto.g, y, self.proto.p)
|
| - sharedSecret = common._MPpow(e, y, self.proto.p)
|
| - h = sha.new()
|
| - h.update(common.NS(self.proto.ourVersionString) * 2)
|
| - h.update(common.NS(self.proto.ourKexInitPayload) * 2)
|
| - h.update(common.NS(self.proto.factory.publicKeys['ssh-rsa'].blob()))
|
| - h.update('\x00\x00\x04\x00')
|
| - h.update(common.MP(self.proto.p))
|
| - h.update(common.MP(self.proto.g))
|
| - h.update(common.MP(e))
|
| - h.update(f)
|
| - h.update(sharedSecret)
|
| - exchangeHash = h.digest()
|
| - self.proto.ssh_KEX_DH_GEX_INIT(common.MP(e))
|
| - self.assertEquals(
|
| - self.packets[1:],
|
| - [(transport.MSG_KEX_DH_GEX_REPLY,
|
| - common.NS(self.proto.factory.publicKeys['ssh-rsa'].blob()) +
|
| - f + common.NS(self.proto.factory.privateKeys['ssh-rsa'].sign(
|
| - exchangeHash))),
|
| - (transport.MSG_NEWKEYS, '')])
|
| -
|
| -
|
| - def test_keySetup(self):
|
| - """
|
| - Test that _keySetup sets up the next encryption keys.
|
| - """
|
| - self.proto.nextEncryptions = MockCipher()
|
| - self.proto._keySetup('AB', 'CD')
|
| - self.assertEquals(self.proto.sessionID, 'CD')
|
| - self.proto._keySetup('AB', 'EF')
|
| - self.assertEquals(self.proto.sessionID, 'CD')
|
| - self.assertEquals(self.packets[-1], (transport.MSG_NEWKEYS, ''))
|
| - newKeys = [self.proto._getKey(c, 'AB', 'EF') for c in 'ABCDEF']
|
| - self.assertEquals(
|
| - self.proto.nextEncryptions.keys,
|
| - (newKeys[1], newKeys[3], newKeys[0], newKeys[2], newKeys[5],
|
| - newKeys[4]))
|
| -
|
| -
|
| - def test_NEWKEYS(self):
|
| - """
|
| - Test that NEWKEYS transitions the keys in nextEncryptions to
|
| - currentEncryptions.
|
| - """
|
| - self.test_KEXINIT()
|
| -
|
| - self.proto.nextEncryptions = transport.SSHCiphers('none', 'none',
|
| - 'none', 'none')
|
| - self.proto.ssh_NEWKEYS('')
|
| - self.assertIdentical(self.proto.currentEncryptions,
|
| - self.proto.nextEncryptions)
|
| - self.assertIdentical(self.proto.outgoingCompression, None)
|
| - self.assertIdentical(self.proto.incomingCompression, None)
|
| - self.proto.outgoingCompressionType = 'zlib'
|
| - self.proto.ssh_NEWKEYS('')
|
| - self.failIfIdentical(self.proto.outgoingCompression, None)
|
| - self.proto.incomingCompressionType = 'zlib'
|
| - self.proto.ssh_NEWKEYS('')
|
| - self.failIfIdentical(self.proto.incomingCompression, None)
|
| -
|
| -
|
| - def test_SERVICE_REQUEST(self):
|
| - """
|
| - Test that the SERVICE_REQUEST message requests and starts a
|
| - service.
|
| - """
|
| - self.proto.ssh_SERVICE_REQUEST(common.NS('ssh-userauth'))
|
| - self.assertEquals(self.packets, [(transport.MSG_SERVICE_ACCEPT,
|
| - common.NS('ssh-userauth'))])
|
| - self.assertEquals(self.proto.service.name, 'MockService')
|
| -
|
| -
|
| - def test_disconnectNEWKEYSData(self):
|
| - """
|
| - Test that NEWKEYS disconnects if it receives data.
|
| - """
|
| - self.proto.ssh_NEWKEYS("bad packet")
|
| - self.checkDisconnected()
|
| -
|
| -
|
| - def test_disconnectSERVICE_REQUESTBadService(self):
|
| - """
|
| - Test that SERVICE_REQUESTS disconnects if an unknown service is
|
| - requested.
|
| - """
|
| - self.proto.ssh_SERVICE_REQUEST(common.NS('no service'))
|
| - self.checkDisconnected(transport.DISCONNECT_SERVICE_NOT_AVAILABLE)
|
| -
|
| -
|
| -
|
| -class ClientSSHTransportTestCase(ServerAndClientSSHTransportBaseCase,
|
| - TransportTestCase):
|
| - """
|
| - Tests for SSHClientTransport.
|
| - """
|
| -
|
| - klass = transport.SSHClientTransport
|
| -
|
| -
|
| - def test_KEXINIT(self):
|
| - """
|
| - Test that receiving a KEXINIT packet sets up the correct values on the
|
| - client. The way algorithms are picks is that the first item in the
|
| - client's list that is also in the server's list is chosen.
|
| - """
|
| - self.proto.dataReceived( 'SSH-2.0-Twisted\r\n\x00\x00\x01\xd4\t\x14'
|
| - '\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99\x99'
|
| - '\x99\x00\x00\x00=diffie-hellman-group1-sha1,diffie-hellman-g'
|
| - 'roup-exchange-sha1\x00\x00\x00\x0fssh-dss,ssh-rsa\x00\x00\x00'
|
| - '\x85aes128-ctr,aes128-cbc,aes192-ctr,aes192-cbc,aes256-ctr,ae'
|
| - 's256-cbc,cast128-ctr,cast128-cbc,blowfish-ctr,blowfish-cbc,3d'
|
| - 'es-ctr,3des-cbc\x00\x00\x00\x85aes128-ctr,aes128-cbc,aes192-c'
|
| - 'tr,aes192-cbc,aes256-ctr,aes256-cbc,cast128-ctr,cast128-cbc,b'
|
| - 'lowfish-ctr,blowfish-cbc,3des-ctr,3des-cbc\x00\x00\x00\x12hma'
|
| - 'c-md5,hmac-sha1\x00\x00\x00\x12hmac-md5,hmac-sha1\x00\x00\x00'
|
| - '\tzlib,none\x00\x00\x00\tzlib,none\x00\x00\x00\x00\x00\x00'
|
| - '\x00\x00\x00\x00\x00\x00\x00\x99\x99\x99\x99\x99\x99\x99\x99'
|
| - '\x99')
|
| - self.assertEquals(self.proto.kexAlg,
|
| - 'diffie-hellman-group-exchange-sha1')
|
| - self.assertEquals(self.proto.keyAlg,
|
| - 'ssh-rsa')
|
| - self.assertEquals(self.proto.outgoingCompressionType,
|
| - 'none')
|
| - self.assertEquals(self.proto.incomingCompressionType,
|
| - 'none')
|
| - ne = self.proto.nextEncryptions
|
| - self.assertEquals(ne.outCipType, 'aes256-ctr')
|
| - self.assertEquals(ne.inCipType, 'aes256-ctr')
|
| - self.assertEquals(ne.outMACType, 'hmac-sha1')
|
| - self.assertEquals(ne.inMACType, 'hmac-sha1')
|
| -
|
| -
|
| - def verifyHostKey(self, pubKey, fingerprint):
|
| - """
|
| - Mock version of SSHClientTransport.verifyHostKey.
|
| - """
|
| - self.calledVerifyHostKey = True
|
| - self.assertEquals(pubKey, self.blob)
|
| - self.assertEquals(fingerprint.replace(':', ''),
|
| - md5.new(pubKey).hexdigest())
|
| - return defer.succeed(True)
|
| -
|
| -
|
| - def setUp(self):
|
| - TransportTestCase.setUp(self)
|
| - self.blob = keys.Key.fromString(keydata.publicRSA_openssh).blob()
|
| - self.privObj = keys.Key.fromString(keydata.privateRSA_openssh)
|
| - self.calledVerifyHostKey = False
|
| - self.proto.verifyHostKey = self.verifyHostKey
|
| -
|
| -
|
| - def test_notImplementedClientMethods(self):
|
| - """
|
| - verifyHostKey() should return a Deferred which fails with a
|
| - NotImplementedError exception. connectionSecure() should raise
|
| - NotImplementedError().
|
| - """
|
| - self.assertRaises(NotImplementedError, self.klass().connectionSecure)
|
| - def _checkRaises(f):
|
| - f.trap(NotImplementedError)
|
| - d = self.klass().verifyHostKey(None, None)
|
| - return d.addCallback(self.fail).addErrback(_checkRaises)
|
| -
|
| -
|
| - def test_KEXINIT_groupexchange(self):
|
| - """
|
| - Test that a KEXINIT packet with a group-exchange key exchange results
|
| - in a KEX_DH_GEX_REQUEST_OLD message..
|
| - """
|
| - self.proto.supportedKeyExchanges = [
|
| - 'diffie-hellman-group-exchange-sha1']
|
| - self.proto.dataReceived(self.transport.value())
|
| - self.assertEquals(self.packets, [(transport.MSG_KEX_DH_GEX_REQUEST_OLD,
|
| - '\x00\x00\x08\x00')])
|
| -
|
| -
|
| - def test_KEXINIT_group1(self):
|
| - """
|
| - Like test_KEXINIT_groupexchange, but for the group-1 key exchange.
|
| - """
|
| - self.proto.supportedKeyExchanges = ['diffie-hellman-group1-sha1']
|
| - self.proto.dataReceived(self.transport.value())
|
| - self.assertEquals(common.MP(self.proto.x)[5:], '\x99' * 64)
|
| - self.assertEquals(self.packets,
|
| - [(transport.MSG_KEXDH_INIT, self.proto.e)])
|
| -
|
| -
|
| - def test_KEXINIT_badKexAlg(self):
|
| - """
|
| - Test that the client raises a ConchError if it receives a
|
| - KEXINIT message bug doesn't have a key exchange algorithm that we
|
| - understand.
|
| - """
|
| - self.proto.supportedKeyExchanges = ['diffie-hellman-group2-sha1']
|
| - data = self.transport.value().replace('group1', 'group2')
|
| - self.assertRaises(ConchError, self.proto.dataReceived, data)
|
| -
|
| -
|
| - def test_KEXDH_REPLY(self):
|
| - """
|
| - Test that the KEXDH_REPLY message verifies the server.
|
| - """
|
| - self.test_KEXINIT_group1()
|
| -
|
| - sharedSecret = common._MPpow(transport.DH_GENERATOR,
|
| - self.proto.x, transport.DH_PRIME)
|
| - h = sha.new()
|
| - h.update(common.NS(self.proto.ourVersionString) * 2)
|
| - h.update(common.NS(self.proto.ourKexInitPayload) * 2)
|
| - h.update(common.NS(self.blob))
|
| - h.update(self.proto.e)
|
| - h.update('\x00\x00\x00\x01\x02') # f
|
| - h.update(sharedSecret)
|
| - exchangeHash = h.digest()
|
| -
|
| - def _cbTestKEXDH_REPLY(value):
|
| - self.assertIdentical(value, None)
|
| - self.assertEquals(self.calledVerifyHostKey, True)
|
| - self.assertEquals(self.proto.sessionID, exchangeHash)
|
| -
|
| - signature = self.privObj.sign(exchangeHash)
|
| -
|
| - d = self.proto.ssh_KEX_DH_GEX_GROUP(
|
| - (common.NS(self.blob) + '\x00\x00\x00\x01\x02' +
|
| - common.NS(signature)))
|
| - d.addCallback(_cbTestKEXDH_REPLY)
|
| -
|
| - return d
|
| -
|
| -
|
| - def test_KEX_DH_GEX_GROUP(self):
|
| - """
|
| - Test that the KEX_DH_GEX_GROUP message results in a
|
| - KEX_DH_GEX_INIT message with the client's Diffie-Hellman public key.
|
| - """
|
| - self.test_KEXINIT_groupexchange()
|
| - self.proto.ssh_KEX_DH_GEX_GROUP(
|
| - '\x00\x00\x00\x01\x0f\x00\x00\x00\x01\x02')
|
| - self.assertEquals(self.proto.p, 15)
|
| - self.assertEquals(self.proto.g, 2)
|
| - self.assertEquals(common.MP(self.proto.x)[5:], '\x99' * 40)
|
| - self.assertEquals(self.proto.e,
|
| - common.MP(pow(2, self.proto.x, 15)))
|
| - self.assertEquals(self.packets[1:], [(transport.MSG_KEX_DH_GEX_INIT,
|
| - self.proto.e)])
|
| -
|
| -
|
| - def test_KEX_DH_GEX_REPLY(self):
|
| - """
|
| - Test that the KEX_DH_GEX_REPLY message results in a verified
|
| - server.
|
| - """
|
| -
|
| - self.test_KEX_DH_GEX_GROUP()
|
| - sharedSecret = common._MPpow(3, self.proto.x, self.proto.p)
|
| - h = sha.new()
|
| - h.update(common.NS(self.proto.ourVersionString) * 2)
|
| - h.update(common.NS(self.proto.ourKexInitPayload) * 2)
|
| - h.update(common.NS(self.blob))
|
| - h.update('\x00\x00\x08\x00\x00\x00\x00\x01\x0f\x00\x00\x00\x01\x02')
|
| - h.update(self.proto.e)
|
| - h.update('\x00\x00\x00\x01\x03') # f
|
| - h.update(sharedSecret)
|
| - exchangeHash = h.digest()
|
| -
|
| - def _cbTestKEX_DH_GEX_REPLY(value):
|
| - self.assertIdentical(value, None)
|
| - self.assertEquals(self.calledVerifyHostKey, True)
|
| - self.assertEquals(self.proto.sessionID, exchangeHash)
|
| -
|
| - signature = self.privObj.sign(exchangeHash)
|
| -
|
| - d = self.proto.ssh_KEX_DH_GEX_REPLY(
|
| - common.NS(self.blob) +
|
| - '\x00\x00\x00\x01\x03' +
|
| - common.NS(signature))
|
| - d.addCallback(_cbTestKEX_DH_GEX_REPLY)
|
| - return d
|
| -
|
| -
|
| - def test_keySetup(self):
|
| - """
|
| - Test that _keySetup sets up the next encryption keys.
|
| - """
|
| - self.proto.nextEncryptions = MockCipher()
|
| - self.proto._keySetup('AB', 'CD')
|
| - self.assertEquals(self.proto.sessionID, 'CD')
|
| - self.proto._keySetup('AB', 'EF')
|
| - self.assertEquals(self.proto.sessionID, 'CD')
|
| - self.assertEquals(self.packets[-1], (transport.MSG_NEWKEYS, ''))
|
| - newKeys = [self.proto._getKey(c, 'AB', 'EF') for c in 'ABCDEF']
|
| - self.assertEquals(self.proto.nextEncryptions.keys,
|
| - (newKeys[0], newKeys[2], newKeys[1], newKeys[3],
|
| - newKeys[4], newKeys[5]))
|
| -
|
| -
|
| - def test_NEWKEYS(self):
|
| - """
|
| - Test that NEWKEYS transitions the keys from nextEncryptions to
|
| - currentEncryptions.
|
| - """
|
| - self.test_KEXINIT()
|
| - secure = [False]
|
| - def stubConnectionSecure():
|
| - secure[0] = True
|
| - self.proto.connectionSecure = stubConnectionSecure
|
| -
|
| - self.proto.nextEncryptions = transport.SSHCiphers('none', 'none',
|
| - 'none', 'none')
|
| - self.proto.ssh_NEWKEYS('')
|
| -
|
| - self.failIfIdentical(self.proto.currentEncryptions,
|
| - self.proto.nextEncryptions)
|
| -
|
| - self.proto.nextEncryptions = MockCipher()
|
| - self.proto._keySetup('AB', 'EF')
|
| - self.assertIdentical(self.proto.outgoingCompression, None)
|
| - self.assertIdentical(self.proto.incomingCompression, None)
|
| - self.assertIdentical(self.proto.currentEncryptions,
|
| - self.proto.nextEncryptions)
|
| - self.assertTrue(secure[0])
|
| - self.proto.outgoingCompressionType = 'zlib'
|
| - self.proto.ssh_NEWKEYS('')
|
| - self.failIfIdentical(self.proto.outgoingCompression, None)
|
| - self.proto.incomingCompressionType = 'zlib'
|
| - self.proto.ssh_NEWKEYS('')
|
| - self.failIfIdentical(self.proto.incomingCompression, None)
|
| -
|
| -
|
| - def test_SERVICE_ACCEPT(self):
|
| - """
|
| - Test that the SERVICE_ACCEPT packet starts the requested service.
|
| - """
|
| - self.proto.instance = MockService()
|
| - self.proto.ssh_SERVICE_ACCEPT('\x00\x00\x00\x0bMockService')
|
| - self.assertTrue(self.proto.instance.started)
|
| -
|
| -
|
| - def test_requestService(self):
|
| - """
|
| - Test that requesting a service sends a SERVICE_REQUEST packet.
|
| - """
|
| - self.proto.requestService(MockService())
|
| - self.assertEquals(self.packets, [(transport.MSG_SERVICE_REQUEST,
|
| - '\x00\x00\x00\x0bMockService')])
|
| -
|
| -
|
| - def test_disconnectKEXDH_REPLYBadSignature(self):
|
| - """
|
| - Test that KEXDH_REPLY disconnects if the signature is bad.
|
| - """
|
| - self.test_KEXDH_REPLY()
|
| - self.proto._continueKEXDH_REPLY(None, self.blob, 3, "bad signature")
|
| - self.checkDisconnected(transport.DISCONNECT_KEY_EXCHANGE_FAILED)
|
| -
|
| -
|
| - def test_disconnectGEX_REPLYBadSignature(self):
|
| - """
|
| - Like test_disconnectKEXDH_REPLYBadSignature, but for DH_GEX_REPLY.
|
| - """
|
| - self.test_KEX_DH_GEX_REPLY()
|
| - self.proto._continueGEX_REPLY(None, self.blob, 3, "bad signature")
|
| - self.checkDisconnected(transport.DISCONNECT_KEY_EXCHANGE_FAILED)
|
| -
|
| -
|
| - def test_disconnectNEWKEYSData(self):
|
| - """
|
| - Test that NEWKEYS disconnects if it receives data.
|
| - """
|
| - self.proto.ssh_NEWKEYS("bad packet")
|
| - self.checkDisconnected()
|
| -
|
| -
|
| - def test_disconnectSERVICE_ACCEPT(self):
|
| - """
|
| - Test that SERVICE_ACCEPT disconnects if the accepted protocol is
|
| - differet from the asked-for protocol.
|
| - """
|
| - self.proto.instance = MockService()
|
| - self.proto.ssh_SERVICE_ACCEPT('\x00\x00\x00\x03bad')
|
| - self.checkDisconnected()
|
| -
|
| -
|
| -
|
| -class SSHCiphersTestCase(unittest.TestCase):
|
| - """
|
| - Tests for the SSHCiphers helper class.
|
| - """
|
| - if Crypto is None:
|
| - skip = "cannot run w/o PyCrypto"
|
| -
|
| -
|
| - def test_init(self):
|
| - """
|
| - Test that the initializer sets up the SSHCiphers object.
|
| - """
|
| - ciphers = transport.SSHCiphers('A', 'B', 'C', 'D')
|
| - self.assertEquals(ciphers.outCipType, 'A')
|
| - self.assertEquals(ciphers.inCipType, 'B')
|
| - self.assertEquals(ciphers.outMACType, 'C')
|
| - self.assertEquals(ciphers.inMACType, 'D')
|
| -
|
| -
|
| - def test_getCipher(self):
|
| - """
|
| - Test that the _getCipher method returns the correct cipher.
|
| - """
|
| - ciphers = transport.SSHCiphers('A', 'B', 'C', 'D')
|
| - iv = key = '\x00' * 16
|
| - for cipName, (modName, keySize, counter) in ciphers.cipherMap.items():
|
| - cip = ciphers._getCipher(cipName, iv, key)
|
| - if cipName == 'none':
|
| - self.assertIsInstance(cip, transport._DummyCipher)
|
| - else:
|
| - self.assertTrue(str(cip).startswith('<' + modName))
|
| -
|
| -
|
| - def test_getMAC(self):
|
| - """
|
| - Test that the _getMAC method returns the correct MAC.
|
| - """
|
| - ciphers = transport.SSHCiphers('A', 'B', 'C', 'D')
|
| - key = '\x00' * 64
|
| - for macName, mac in ciphers.macMap.items():
|
| - mod = ciphers._getMAC(macName, key)
|
| - if macName == 'none':
|
| - self.assertIdentical(mac, None)
|
| - else:
|
| - self.assertEquals(mod[0], mac)
|
| - self.assertEquals(mod[1],
|
| - Crypto.Cipher.XOR.new('\x36').encrypt(key))
|
| - self.assertEquals(mod[2],
|
| - Crypto.Cipher.XOR.new('\x5c').encrypt(key))
|
| - self.assertEquals(mod[3], len(mod[0].new().digest()))
|
| -
|
| -
|
| - def test_setKeysCiphers(self):
|
| - """
|
| - Test that setKeys sets up the ciphers.
|
| - """
|
| - key = '\x00' * 64
|
| - cipherItems = transport.SSHCiphers.cipherMap.items()
|
| - for cipName, (modName, keySize, counter) in cipherItems:
|
| - encCipher = transport.SSHCiphers(cipName, 'none', 'none', 'none')
|
| - decCipher = transport.SSHCiphers('none', cipName, 'none', 'none')
|
| - cip = encCipher._getCipher(cipName, key, key)
|
| - bs = cip.block_size
|
| - encCipher.setKeys(key, key, '', '', '', '')
|
| - decCipher.setKeys('', '', key, key, '', '')
|
| - self.assertEquals(encCipher.encBlockSize, bs)
|
| - self.assertEquals(decCipher.decBlockSize, bs)
|
| - enc = cip.encrypt(key[:bs])
|
| - enc2 = cip.encrypt(key[:bs])
|
| - if counter:
|
| - self.failIfEquals(enc, enc2)
|
| - self.assertEquals(encCipher.encrypt(key[:bs]), enc)
|
| - self.assertEquals(encCipher.encrypt(key[:bs]), enc2)
|
| - self.assertEquals(decCipher.decrypt(enc), key[:bs])
|
| - self.assertEquals(decCipher.decrypt(enc2), key[:bs])
|
| -
|
| -
|
| - def test_setKeysMACs(self):
|
| - """
|
| - Test that setKeys sets up the MACs.
|
| - """
|
| - key = '\x00' * 64
|
| - for macName, mod in transport.SSHCiphers.macMap.items():
|
| - outMac = transport.SSHCiphers('none', 'none', macName, 'none')
|
| - inMac = transport.SSHCiphers('none', 'none', 'none', macName)
|
| - outMac.setKeys('', '', '', '', key, '')
|
| - inMac.setKeys('', '', '', '', '', key)
|
| - if mod:
|
| - ds = mod.digest_size
|
| - else:
|
| - ds = 0
|
| - self.assertEquals(inMac.verifyDigestSize, ds)
|
| - if mod:
|
| - mod, i, o, ds = outMac._getMAC(macName, key)
|
| - seqid = 0
|
| - data = key
|
| - packet = '\x00' * 4 + key
|
| - if mod:
|
| - mac = mod.new(o + mod.new(i + packet).digest()).digest()
|
| - else:
|
| - mac = ''
|
| - self.assertEquals(outMac.makeMAC(seqid, data), mac)
|
| - self.assertTrue(inMac.verify(seqid, data, mac))
|
| -
|
| -
|
| -
|
| -class CounterTestCase(unittest.TestCase):
|
| - """
|
| - Tests for the _Counter helper class.
|
| - """
|
| - if Crypto is None:
|
| - skip = "cannot run w/o PyCrypto"
|
| -
|
| -
|
| - def test_init(self):
|
| - """
|
| - Test that the counter is initialized correctly.
|
| - """
|
| - counter = transport._Counter('\x00' * 8 + '\xff' * 8, 8)
|
| - self.assertEquals(counter.blockSize, 8)
|
| - self.assertEquals(counter.count.tostring(), '\x00' * 8)
|
| -
|
| -
|
| - def test_count(self):
|
| - """
|
| - Test that the counter counts incrementally and wraps at the top.
|
| - """
|
| - counter = transport._Counter('\x00', 1)
|
| - self.assertEquals(counter(), '\x01')
|
| - self.assertEquals(counter(), '\x02')
|
| - [counter() for i in range(252)]
|
| - self.assertEquals(counter(), '\xff')
|
| - self.assertEquals(counter(), '\x00')
|
| -
|
| -
|
| -
|
| -class TransportLoopbackTestCase(unittest.TestCase):
|
| - """
|
| - Test the server transport and client transport against each other,
|
| - """
|
| - if Crypto is None:
|
| - skip = "cannot run w/o PyCrypto"
|
| -
|
| -
|
| - def _runClientServer(self, mod):
|
| - """
|
| - Run an async client and server, modifying each using the mod function
|
| - provided. Returns a Deferred called back when both Protocols have
|
| - disconnected.
|
| -
|
| - @type mod: C{func}
|
| - @rtype: C{defer.Deferred}
|
| - """
|
| - factory = MockFactory()
|
| - server = transport.SSHServerTransport()
|
| - server.factory = factory
|
| - factory.startFactory()
|
| - server.errors = []
|
| - server.receiveError = lambda code, desc: server.errors.append((
|
| - code, desc))
|
| - client = transport.SSHClientTransport()
|
| - client.verifyHostKey = lambda x, y: defer.succeed(None)
|
| - client.errors = []
|
| - client.receiveError = lambda code, desc: client.errors.append((
|
| - code, desc))
|
| - client.connectionSecure = lambda: client.loseConnection()
|
| - server = mod(server)
|
| - client = mod(client)
|
| - def check(ignored, server, client):
|
| - name = repr([server.supportedCiphers[0],
|
| - server.supportedMACs[0],
|
| - server.supportedKeyExchanges[0],
|
| - server.supportedCompressions[0]])
|
| - self.assertEquals(client.errors, [])
|
| - self.assertEquals(server.errors, [(
|
| - transport.DISCONNECT_CONNECTION_LOST,
|
| - "user closed connection")])
|
| - if server.supportedCiphers[0] == 'none':
|
| - self.assertFalse(server.isEncrypted(), name)
|
| - self.assertFalse(client.isEncrypted(), name)
|
| - else:
|
| - self.assertTrue(server.isEncrypted(), name)
|
| - self.assertTrue(client.isEncrypted(), name)
|
| - if server.supportedMACs[0] == 'none':
|
| - self.assertFalse(server.isVerified(), name)
|
| - self.assertFalse(client.isVerified(), name)
|
| - else:
|
| - self.assertTrue(server.isVerified(), name)
|
| - self.assertTrue(client.isVerified(), name)
|
| -
|
| - d = loopback.loopbackAsync(server, client)
|
| - d.addCallback(check, server, client)
|
| - return d
|
| -
|
| -
|
| - def test_ciphers(self):
|
| - """
|
| - Test that the client and server play nicely together, in all
|
| - the various combinations of ciphers.
|
| - """
|
| - deferreds = []
|
| - for cipher in transport.SSHTransportBase.supportedCiphers + ['none']:
|
| - def setCipher(proto):
|
| - proto.supportedCiphers = [cipher]
|
| - return proto
|
| - deferreds.append(self._runClientServer(setCipher))
|
| - return defer.DeferredList(deferreds, fireOnOneErrback=True)
|
| -
|
| -
|
| - def test_macs(self):
|
| - """
|
| - Like test_ciphers, but for the various MACs.
|
| - """
|
| - deferreds = []
|
| - for mac in transport.SSHTransportBase.supportedMACs + ['none']:
|
| - def setMAC(proto):
|
| - proto.supportedMACs = [mac]
|
| - return proto
|
| - deferreds.append(self._runClientServer(setMAC))
|
| - return defer.DeferredList(deferreds, fireOnOneErrback=True)
|
| -
|
| -
|
| - def test_keyexchanges(self):
|
| - """
|
| - Like test_ciphers, but for the various key exchanges.
|
| - """
|
| - deferreds = []
|
| - for kex in transport.SSHTransportBase.supportedKeyExchanges:
|
| - def setKeyExchange(proto):
|
| - proto.supportedKeyExchanges = [kex]
|
| - return proto
|
| - deferreds.append(self._runClientServer(setKeyExchange))
|
| - return defer.DeferredList(deferreds, fireOnOneErrback=True)
|
| -
|
| -
|
| - def test_compressions(self):
|
| - """
|
| - Like test_ciphers, but for the various compressions.
|
| - """
|
| - deferreds = []
|
| - for compression in transport.SSHTransportBase.supportedCompressions:
|
| - def setCompression(proto):
|
| - proto.supportedCompressions = [compression]
|
| - return proto
|
| - deferreds.append(self._runClientServer(setCompression))
|
| - return defer.DeferredList(deferreds, fireOnOneErrback=True)
|
| -
|
| -
|
| -
|
| -class OldFactoryTestCase(unittest.TestCase):
|
| - """
|
| - The old C{SSHFactory.getPublicKeys}() returned mappings of key names to
|
| - strings of key blobs and mappings of key names to PyCrypto key objects from
|
| - C{SSHFactory.getPrivateKeys}() (they could also be specified with the
|
| - C{publicKeys} and C{privateKeys} attributes). This is no longer supported
|
| - by the C{SSHServerTransport}, so we warn the user if they create an old
|
| - factory.
|
| - """
|
| -
|
| -
|
| - def test_getPublicKeysWarning(self):
|
| - """
|
| - If the return value of C{getPublicKeys}() isn't a mapping from key
|
| - names to C{Key} objects, then warn the user and convert the mapping.
|
| - """
|
| - sshFactory = MockOldFactoryPublicKeys()
|
| - self.assertWarns(DeprecationWarning,
|
| - "Returning a mapping from strings to strings from"
|
| - " getPublicKeys()/publicKeys (in %s) is deprecated. Return "
|
| - "a mapping from strings to Key objects instead." %
|
| - (qual(MockOldFactoryPublicKeys),),
|
| - factory.__file__, sshFactory.startFactory)
|
| - self.assertEquals(sshFactory.publicKeys, MockFactory().getPublicKeys())
|
| -
|
| -
|
| - def test_getPrivateKeysWarning(self):
|
| - """
|
| - If the return value of C{getPrivateKeys}() isn't a mapping from key
|
| - names to C{Key} objects, then warn the user and convert the mapping.
|
| - """
|
| - sshFactory = MockOldFactoryPrivateKeys()
|
| - self.assertWarns(DeprecationWarning,
|
| - "Returning a mapping from strings to PyCrypto key objects from"
|
| - " getPrivateKeys()/privateKeys (in %s) is deprecated. Return"
|
| - " a mapping from strings to Key objects instead." %
|
| - (qual(MockOldFactoryPrivateKeys),),
|
| - factory.__file__, sshFactory.startFactory)
|
| - self.assertEquals(sshFactory.privateKeys,
|
| - MockFactory().getPrivateKeys())
|
| -
|
| -
|
| - def test_publicKeysWarning(self):
|
| - """
|
| - If the value of the C{publicKeys} attribute isn't a mapping from key
|
| - names to C{Key} objects, then warn the user and convert the mapping.
|
| - """
|
| - sshFactory = MockOldFactoryPublicKeys()
|
| - sshFactory.publicKeys = sshFactory.getPublicKeys()
|
| - self.assertWarns(DeprecationWarning,
|
| - "Returning a mapping from strings to strings from"
|
| - " getPublicKeys()/publicKeys (in %s) is deprecated. Return "
|
| - "a mapping from strings to Key objects instead." %
|
| - (qual(MockOldFactoryPublicKeys),),
|
| - factory.__file__, sshFactory.startFactory)
|
| - self.assertEquals(sshFactory.publicKeys, MockFactory().getPublicKeys())
|
| -
|
| -
|
| - def test_privateKeysWarning(self):
|
| - """
|
| - If the return value of C{privateKeys} attribute isn't a mapping from
|
| - key names to C{Key} objects, then warn the user and convert the
|
| - mapping.
|
| - """
|
| - sshFactory = MockOldFactoryPrivateKeys()
|
| - sshFactory.privateKeys = sshFactory.getPrivateKeys()
|
| - self.assertWarns(DeprecationWarning,
|
| - "Returning a mapping from strings to PyCrypto key objects from"
|
| - " getPrivateKeys()/privateKeys (in %s) is deprecated. Return"
|
| - " a mapping from strings to Key objects instead." %
|
| - (qual(MockOldFactoryPrivateKeys),),
|
| - factory.__file__, sshFactory.startFactory)
|
| - self.assertEquals(sshFactory.privateKeys,
|
| - MockFactory().getPrivateKeys())
|
|
|