Index: third_party/twisted_8_1/twisted/test/test_ssl.py |
diff --git a/third_party/twisted_8_1/twisted/test/test_ssl.py b/third_party/twisted_8_1/twisted/test/test_ssl.py |
deleted file mode 100644 |
index 15892994ede6e0a967484d348044113abde1c690..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/test/test_ssl.py |
+++ /dev/null |
@@ -1,431 +0,0 @@ |
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
- |
-from twisted.trial import unittest, util as trial_util |
-from twisted.internet import protocol, reactor, interfaces, defer |
-from twisted.protocols import basic |
-from twisted.python import util, log |
-from twisted.python.runtime import platform |
-from twisted.test.test_tcp import WriteDataTestCase, ProperlyCloseFilesMixin |
- |
-import os, errno |
- |
-try: |
- from OpenSSL import SSL, crypto |
- from twisted.internet import ssl |
- from twisted.test.ssl_helpers import ClientTLSContext |
-except ImportError: |
- def _noSSL(): |
- # ugh, make pyflakes happy. |
- global SSL |
- global ssl |
- SSL = ssl = None |
- _noSSL() |
- |
-certPath = util.sibpath(__file__, "server.pem") |
- |
-class UnintelligentProtocol(basic.LineReceiver): |
- pretext = [ |
- "first line", |
- "last thing before tls starts", |
- "STARTTLS"] |
- |
- posttext = [ |
- "first thing after tls started", |
- "last thing ever"] |
- |
- def connectionMade(self): |
- for l in self.pretext: |
- self.sendLine(l) |
- |
- def lineReceived(self, line): |
- if line == "READY": |
- self.transport.startTLS(ClientTLSContext(), self.factory.client) |
- for l in self.posttext: |
- self.sendLine(l) |
- self.transport.loseConnection() |
- |
- |
-class LineCollector(basic.LineReceiver): |
- def __init__(self, doTLS, fillBuffer=0): |
- self.doTLS = doTLS |
- self.fillBuffer = fillBuffer |
- |
- def connectionMade(self): |
- self.factory.rawdata = '' |
- self.factory.lines = [] |
- |
- def lineReceived(self, line): |
- self.factory.lines.append(line) |
- if line == 'STARTTLS': |
- if self.fillBuffer: |
- for x in range(500): |
- self.sendLine('X'*1000) |
- self.sendLine('READY') |
- if self.doTLS: |
- ctx = ServerTLSContext( |
- privateKeyFileName=certPath, |
- certificateFileName=certPath, |
- ) |
- self.transport.startTLS(ctx, self.factory.server) |
- else: |
- self.setRawMode() |
- |
- def rawDataReceived(self, data): |
- self.factory.rawdata += data |
- self.factory.done = 1 |
- |
- def connectionLost(self, reason): |
- self.factory.done = 1 |
- |
- |
-class SingleLineServerProtocol(protocol.Protocol): |
- def connectionMade(self): |
- self.transport.identifier = 'SERVER' |
- self.transport.write("+OK <some crap>\r\n") |
- self.transport.getPeerCertificate() |
- |
- |
-class RecordingClientProtocol(protocol.Protocol): |
- def connectionMade(self): |
- self.transport.identifier = 'CLIENT' |
- self.buffer = [] |
- self.transport.getPeerCertificate() |
- |
- def dataReceived(self, data): |
- self.factory.buffer.append(data) |
- |
- |
-class ImmediatelyDisconnectingProtocol(protocol.Protocol): |
- def connectionMade(self): |
- self.transport.loseConnection() |
- |
- def connectionLost(self, reason): |
- self.factory.connectionDisconnected.callback(None) |
- |
- |
-class AlmostImmediatelyDisconnectingProtocol(protocol.Protocol): |
- def connectionMade(self): |
- # Twisted's SSL support is terribly broken. |
- reactor.callLater(0.1, self.transport.loseConnection) |
- |
- def connectionLost(self, reason): |
- self.factory.connectionDisconnected.callback(reason) |
- |
- |
-def generateCertificateObjects(organization, organizationalUnit): |
- pkey = crypto.PKey() |
- pkey.generate_key(crypto.TYPE_RSA, 512) |
- req = crypto.X509Req() |
- subject = req.get_subject() |
- subject.O = organization |
- subject.OU = organizationalUnit |
- req.set_pubkey(pkey) |
- req.sign(pkey, "md5") |
- |
- # Here comes the actual certificate |
- cert = crypto.X509() |
- cert.set_serial_number(1) |
- cert.gmtime_adj_notBefore(0) |
- cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived |
- cert.set_issuer(req.get_subject()) |
- cert.set_subject(req.get_subject()) |
- cert.set_pubkey(req.get_pubkey()) |
- cert.sign(pkey, "md5") |
- |
- return pkey, req, cert |
- |
- |
-def generateCertificateFiles(basename, organization, organizationalUnit): |
- pkey, req, cert = generateCertificateObjects(organization, organizationalUnit) |
- |
- for ext, obj, dumpFunc in [ |
- ('key', pkey, crypto.dump_privatekey), |
- ('req', req, crypto.dump_certificate_request), |
- ('cert', cert, crypto.dump_certificate)]: |
- fName = os.extsep.join((basename, ext)) |
- fObj = file(fName, 'w') |
- fObj.write(dumpFunc(crypto.FILETYPE_PEM, obj)) |
- fObj.close() |
- |
- |
-class ContextGeneratingMixin: |
- def makeContextFactory(self, org, orgUnit, *args, **kwArgs): |
- base = self.mktemp() |
- generateCertificateFiles(base, org, orgUnit) |
- serverCtxFactory = ssl.DefaultOpenSSLContextFactory( |
- os.extsep.join((base, 'key')), |
- os.extsep.join((base, 'cert')), |
- *args, **kwArgs) |
- |
- return base, serverCtxFactory |
- |
- def setupServerAndClient(self, clientArgs, clientKwArgs, serverArgs, serverKwArgs): |
- self.clientBase, self.clientCtxFactory = self.makeContextFactory( |
- *clientArgs, **clientKwArgs) |
- self.serverBase, self.serverCtxFactory = self.makeContextFactory( |
- *serverArgs, **serverKwArgs) |
- |
- |
-if SSL is not None: |
- class ServerTLSContext(ssl.DefaultOpenSSLContextFactory): |
- isClient = 0 |
- def __init__(self, *args, **kw): |
- kw['sslmethod'] = SSL.TLSv1_METHOD |
- ssl.DefaultOpenSSLContextFactory.__init__(self, *args, **kw) |
- |
- |
-class StolenTCPTestCase(ProperlyCloseFilesMixin, WriteDataTestCase): |
- """ |
- For SSL transports, test many of the same things which are tested for |
- TCP transports. |
- """ |
- def createServer(self, address, portNumber, factory): |
- contextFactory = ssl.CertificateOptions() |
- return reactor.listenSSL( |
- portNumber, factory, contextFactory, interface=address) |
- |
- |
- def connectClient(self, address, portNumber, clientCreator): |
- contextFactory = ssl.CertificateOptions() |
- return clientCreator.connectSSL(address, portNumber, contextFactory) |
- |
- |
- def getHandleExceptionType(self): |
- return SSL.SysCallError |
- |
- |
- def getHandleErrorCode(self): |
- # Windows 2000 SP 4 and Windows XP SP 2 give back WSAENOTSOCK for |
- # SSL.Connection.write for some reason. |
- if platform.getType() == 'win32': |
- return errno.WSAENOTSOCK |
- return ProperlyCloseFilesMixin.getHandleErrorCode(self) |
- |
- |
-class TLSTestCase(unittest.TestCase): |
- fillBuffer = 0 |
- |
- port = None |
- clientProto = None |
- serverProto = None |
- |
- def tearDown(self): |
- if self.clientProto is not None and self.clientProto.transport is not None: |
- self.clientProto.transport.loseConnection() |
- if self.serverProto is not None and self.serverProto.transport is not None: |
- self.serverProto.transport.loseConnection() |
- |
- if self.port is not None: |
- return defer.maybeDeferred(self.port.stopListening) |
- |
- def _runTest(self, clientProto, serverProto, clientIsServer=False): |
- self.clientProto = clientProto |
- cf = self.clientFactory = protocol.ClientFactory() |
- cf.protocol = lambda: clientProto |
- if clientIsServer: |
- cf.server = 0 |
- else: |
- cf.client = 1 |
- |
- self.serverProto = serverProto |
- sf = self.serverFactory = protocol.ServerFactory() |
- sf.protocol = lambda: serverProto |
- if clientIsServer: |
- sf.client = 0 |
- else: |
- sf.server = 1 |
- |
- if clientIsServer: |
- inCharge = cf |
- else: |
- inCharge = sf |
- inCharge.done = 0 |
- |
- port = self.port = reactor.listenTCP(0, sf, interface="127.0.0.1") |
- portNo = port.getHost().port |
- |
- reactor.connectTCP('127.0.0.1', portNo, cf) |
- |
- i = 0 |
- while i < 1000 and not inCharge.done: |
- reactor.iterate(0.01) |
- i += 1 |
- self.failUnless( |
- inCharge.done, |
- "Never finished reading all lines: %s" % (inCharge.lines,)) |
- |
- |
- def testTLS(self): |
- self._runTest(UnintelligentProtocol(), LineCollector(1, self.fillBuffer)) |
- self.assertEquals( |
- self.serverFactory.lines, |
- UnintelligentProtocol.pretext + UnintelligentProtocol.posttext |
- ) |
- |
- |
- def testUnTLS(self): |
- self._runTest(UnintelligentProtocol(), LineCollector(0, self.fillBuffer)) |
- self.assertEquals( |
- self.serverFactory.lines, |
- UnintelligentProtocol.pretext |
- ) |
- self.failUnless(self.serverFactory.rawdata, "No encrypted bytes received") |
- |
- |
- def testBackwardsTLS(self): |
- self._runTest(LineCollector(1, self.fillBuffer), UnintelligentProtocol(), True) |
- self.assertEquals( |
- self.clientFactory.lines, |
- UnintelligentProtocol.pretext + UnintelligentProtocol.posttext |
- ) |
- |
- |
- |
-_bufferedSuppression = trial_util.suppress( |
- message="startTLS with unwritten buffered data currently doesn't work " |
- "right. See issue #686. Closing connection.", |
- category=RuntimeWarning) |
- |
- |
-class SpammyTLSTestCase(TLSTestCase): |
- """ |
- Test TLS features with bytes sitting in the out buffer. |
- """ |
- fillBuffer = 1 |
- |
- def testTLS(self): |
- return TLSTestCase.testTLS(self) |
- testTLS.suppress = [_bufferedSuppression] |
- testTLS.todo = "startTLS doesn't empty buffer before starting TLS. :(" |
- |
- |
- def testBackwardsTLS(self): |
- return TLSTestCase.testBackwardsTLS(self) |
- testBackwardsTLS.suppress = [_bufferedSuppression] |
- testBackwardsTLS.todo = "startTLS doesn't empty buffer before starting TLS. :(" |
- |
- |
-class BufferingTestCase(unittest.TestCase): |
- port = None |
- connector = None |
- serverProto = None |
- clientProto = None |
- |
- def tearDown(self): |
- if self.serverProto is not None and self.serverProto.transport is not None: |
- self.serverProto.transport.loseConnection() |
- if self.clientProto is not None and self.clientProto.transport is not None: |
- self.clientProto.transport.loseConnection() |
- if self.port is not None: |
- return defer.maybeDeferred(self.port.stopListening) |
- |
- def testOpenSSLBuffering(self): |
- serverProto = self.serverProto = SingleLineServerProtocol() |
- clientProto = self.clientProto = RecordingClientProtocol() |
- |
- server = protocol.ServerFactory() |
- client = self.client = protocol.ClientFactory() |
- |
- server.protocol = lambda: serverProto |
- client.protocol = lambda: clientProto |
- client.buffer = [] |
- |
- sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath) |
- cCTX = ssl.ClientContextFactory() |
- |
- port = self.port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1') |
- reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX) |
- |
- i = 0 |
- while i < 5000 and not client.buffer: |
- i += 1 |
- reactor.iterate() |
- |
- self.assertEquals(client.buffer, ["+OK <some crap>\r\n"]) |
- |
- |
-class ConnectionLostTestCase(unittest.TestCase, ContextGeneratingMixin): |
- |
- def testImmediateDisconnect(self): |
- org = "twisted.test.test_ssl" |
- self.setupServerAndClient( |
- (org, org + ", client"), {}, |
- (org, org + ", server"), {}) |
- |
- # Set up a server, connect to it with a client, which should work since our verifiers |
- # allow anything, then disconnect. |
- serverProtocolFactory = protocol.ServerFactory() |
- serverProtocolFactory.protocol = protocol.Protocol |
- self.serverPort = serverPort = reactor.listenSSL(0, |
- serverProtocolFactory, self.serverCtxFactory) |
- |
- clientProtocolFactory = protocol.ClientFactory() |
- clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol |
- clientProtocolFactory.connectionDisconnected = defer.Deferred() |
- clientConnector = reactor.connectSSL('127.0.0.1', |
- serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory) |
- |
- return clientProtocolFactory.connectionDisconnected.addCallback( |
- lambda ignoredResult: self.serverPort.stopListening()) |
- |
- def testFailedVerify(self): |
- org = "twisted.test.test_ssl" |
- self.setupServerAndClient( |
- (org, org + ", client"), {}, |
- (org, org + ", server"), {}) |
- |
- def verify(*a): |
- return False |
- self.clientCtxFactory.getContext().set_verify(SSL.VERIFY_PEER, verify) |
- |
- serverConnLost = defer.Deferred() |
- serverProtocol = protocol.Protocol() |
- serverProtocol.connectionLost = serverConnLost.callback |
- serverProtocolFactory = protocol.ServerFactory() |
- serverProtocolFactory.protocol = lambda: serverProtocol |
- self.serverPort = serverPort = reactor.listenSSL(0, |
- serverProtocolFactory, self.serverCtxFactory) |
- |
- clientConnLost = defer.Deferred() |
- clientProtocol = protocol.Protocol() |
- clientProtocol.connectionLost = clientConnLost.callback |
- clientProtocolFactory = protocol.ClientFactory() |
- clientProtocolFactory.protocol = lambda: clientProtocol |
- clientConnector = reactor.connectSSL('127.0.0.1', |
- serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory) |
- |
- dl = defer.DeferredList([serverConnLost, clientConnLost], consumeErrors=True) |
- return dl.addCallback(self._cbLostConns) |
- |
- def _cbLostConns(self, results): |
- (sSuccess, sResult), (cSuccess, cResult) = results |
- |
- self.failIf(sSuccess) |
- self.failIf(cSuccess) |
- |
- acceptableErrors = [SSL.Error] |
- |
- # Rather than getting a verification failure on Windows, we are getting |
- # a connection failure. Without something like sslverify proxying |
- # in-between we can't fix up the platform's errors, so let's just |
- # specifically say it is only OK in this one case to keep the tests |
- # passing. Normally we'd like to be as strict as possible here, so |
- # we're not going to allow this to report errors incorrectly on any |
- # other platforms. |
- |
- if platform.isWindows(): |
- from twisted.internet.error import ConnectionLost |
- acceptableErrors.append(ConnectionLost) |
- |
- sResult.trap(*acceptableErrors) |
- cResult.trap(*acceptableErrors) |
- |
- return self.serverPort.stopListening() |
- |
- |
-if interfaces.IReactorSSL(reactor, None) is None: |
- for tCase in [StolenTCPTestCase, TLSTestCase, SpammyTLSTestCase, |
- BufferingTestCase, ConnectionLostTestCase]: |
- tCase.skip = "Reactor does not support SSL, cannot run SSL tests" |